NIFI-1083: Added a Grouping Regular Expression property for grouping lines of text

This commit is contained in:
Mark Payne 2015-11-05 10:36:38 -05:00
parent 3e538d9007
commit e7c6c7cae9
2 changed files with 284 additions and 19 deletions

View File

@ -32,8 +32,10 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.DynamicRelationship; import org.apache.nifi.annotation.behavior.DynamicRelationship;
import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.EventDriven;
@ -74,6 +76,7 @@ import org.apache.nifi.processors.standard.util.NLKBufferedReader;
public class RouteText extends AbstractProcessor { public class RouteText extends AbstractProcessor {
public static final String ROUTE_ATTRIBUTE_KEY = "RouteText.Route"; public static final String ROUTE_ATTRIBUTE_KEY = "RouteText.Route";
public static final String GROUP_ATTRIBUTE_KEY = "RouteText.Group";
private static final String routeAllMatchValue = "Route to 'matched' if line matches all conditions"; private static final String routeAllMatchValue = "Route to 'matched' if line matches all conditions";
private static final String routeAnyMatchValue = "Route to 'matched' if lines matches any condition"; private static final String routeAnyMatchValue = "Route to 'matched' if lines matches any condition";
@ -127,7 +130,7 @@ public class RouteText extends AbstractProcessor {
public static final PropertyDescriptor TRIM_WHITESPACE = new PropertyDescriptor.Builder() public static final PropertyDescriptor TRIM_WHITESPACE = new PropertyDescriptor.Builder()
.name("Ignore Leading/Trailing Whitespace") .name("Ignore Leading/Trailing Whitespace")
.description("Indicates whether or the whitespace at the beginning and end of the lines should be ignored when evaluating the line.") .description("Indicates whether or not the whitespace at the beginning and end of the lines should be ignored when evaluating the line.")
.required(true) .required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR) .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("true") .defaultValue("true")
@ -143,6 +146,18 @@ public class RouteText extends AbstractProcessor {
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor GROUPING_REGEX = new PropertyDescriptor.Builder()
.name("Grouping Regular Expression")
.description("Specifies a Regular Expression to evaluate against each line to determine which Group the line should be placed in. "
+ "The Regular Expression must have at least one Capturing Group that defines the line's Group. If multiple Capturing Groups exist in the Regular Expression, the Group from all "
+ "Capturing Groups. Two lines will not be placed into the same FlowFile unless the they both have the same value for the Group "
+ "(or neither line matches the Regular Expression). For example, to group together all lines in a CSV File by the first column, we can set this value to \"(.*?),.*\". "
+ "Two lines that have the same Group but different Relationships will never be placed into the same FlowFile.")
.addValidator(StandardValidators.createRegexValidator(1, Integer.MAX_VALUE, false))
.expressionLanguageSupported(false)
.required(false)
.build();
public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
.name("Character Set") .name("Character Set")
.description("The Character Set in which the incoming text is encoded") .description("The Character Set in which the incoming text is encoded")
@ -164,6 +179,8 @@ public class RouteText extends AbstractProcessor {
.description("Data that satisfies the required user-defined rules will be routed to this Relationship") .description("Data that satisfies the required user-defined rules will be routed to this Relationship")
.build(); .build();
private static Group EMPTY_GROUP = new Group(Collections.<String> emptyList());
private AtomicReference<Set<Relationship>> relationships = new AtomicReference<>(); private AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
private List<PropertyDescriptor> properties; private List<PropertyDescriptor> properties;
private volatile String configuredRouteStrategy = ROUTE_STRATEGY.getDefaultValue(); private volatile String configuredRouteStrategy = ROUTE_STRATEGY.getDefaultValue();
@ -174,6 +191,7 @@ public class RouteText extends AbstractProcessor {
* {@link #onTrigger(ProcessContext, ProcessSession)} * {@link #onTrigger(ProcessContext, ProcessSession)}
*/ */
private volatile Map<Relationship, PropertyValue> propertyMap = new HashMap<>(); private volatile Map<Relationship, PropertyValue> propertyMap = new HashMap<>();
private volatile Pattern groupingRegex = null;
@Override @Override
protected void init(final ProcessorInitializationContext context) { protected void init(final ProcessorInitializationContext context) {
@ -188,6 +206,7 @@ public class RouteText extends AbstractProcessor {
properties.add(CHARACTER_SET); properties.add(CHARACTER_SET);
properties.add(TRIM_WHITESPACE); properties.add(TRIM_WHITESPACE);
properties.add(IGNORE_CASE); properties.add(IGNORE_CASE);
properties.add(GROUPING_REGEX);
this.properties = Collections.unmodifiableList(properties); this.properties = Collections.unmodifiableList(properties);
} }
@ -239,6 +258,7 @@ public class RouteText extends AbstractProcessor {
newRelationships.add(REL_MATCH); newRelationships.add(REL_MATCH);
} }
newRelationships.add(REL_ORIGINAL);
newRelationships.add(REL_NO_MATCH); newRelationships.add(REL_NO_MATCH);
this.relationships.set(newRelationships); this.relationships.set(newRelationships);
} }
@ -251,6 +271,11 @@ public class RouteText extends AbstractProcessor {
*/ */
@OnScheduled @OnScheduled
public void onScheduled(final ProcessContext context) { public void onScheduled(final ProcessContext context) {
final String regex = context.getProperty(GROUPING_REGEX).getValue();
if (regex != null) {
groupingRegex = Pattern.compile(regex);
}
final Map<Relationship, PropertyValue> newPropertyMap = new HashMap<>(); final Map<Relationship, PropertyValue> newPropertyMap = new HashMap<>();
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
if (!descriptor.isDynamic()) { if (!descriptor.isDynamic()) {
@ -326,21 +351,43 @@ public class RouteText extends AbstractProcessor {
propValueMap.put(entry.getKey(), compileRegex ? compiledRegex : value); propValueMap.put(entry.getKey(), compileRegex ? compiledRegex : value);
} }
final Map<Relationship, FlowFile> flowFileMap = new HashMap<>(); final Map<Relationship, Map<Group, FlowFile>> flowFileMap = new HashMap<>();
final Pattern groupPattern = groupingRegex;
session.read(originalFlowFile, new InputStreamCallback() { session.read(originalFlowFile, new InputStreamCallback() {
@Override @Override
public void process(final InputStream in) throws IOException { public void process(final InputStream in) throws IOException {
try (final Reader inReader = new InputStreamReader(in,charset); try (final Reader inReader = new InputStreamReader(in, charset);
final NLKBufferedReader reader = new NLKBufferedReader(inReader)) { final NLKBufferedReader reader = new NLKBufferedReader(inReader)) {
String line; String line;
while ((line = reader.readLine()) != null) { while ((line = reader.readLine()) != null) {
final String matchLine;
if (trim) {
matchLine = line.trim();
} else {
// Always trim off the new-line and carriage return characters before evaluating the line.
// The NLKBufferedReader maintains these characters so that when we write the line out we can maintain
// these characters. However, we don't actually want to match against these characters.
final String lineWithoutEndings;
final int indexOfCR = line.indexOf("\r");
final int indexOfNL = line.indexOf("\n");
if (indexOfCR > 0 && indexOfNL > 0) {
lineWithoutEndings = line.substring(0, Math.min(indexOfCR, indexOfNL));
} else if (indexOfCR > 0) {
lineWithoutEndings = line.substring(0, indexOfCR);
} else if (indexOfNL > 0) {
lineWithoutEndings = line.substring(0, indexOfNL);
} else {
lineWithoutEndings = line;
}
matchLine = lineWithoutEndings;
}
int propertiesThatMatchedLine = 0; int propertiesThatMatchedLine = 0;
for (final Map.Entry<Relationship, Object> entry : propValueMap.entrySet()) { for (final Map.Entry<Relationship, Object> entry : propValueMap.entrySet()) {
String matchLine = trim ? line.trim() : line;
boolean lineMatchesProperty = lineMatches(matchLine, entry.getValue(), context.getProperty(MATCH_STRATEGY).getValue(), ignoreCase); boolean lineMatchesProperty = lineMatches(matchLine, entry.getValue(), context.getProperty(MATCH_STRATEGY).getValue(), ignoreCase);
if (lineMatchesProperty) { if (lineMatchesProperty) {
propertiesThatMatchedLine++; propertiesThatMatchedLine++;
@ -349,7 +396,9 @@ public class RouteText extends AbstractProcessor {
if (lineMatchesProperty && ROUTE_TO_MATCHING_PROPERTY_NAME.getValue().equals(routeStrategy)) { if (lineMatchesProperty && ROUTE_TO_MATCHING_PROPERTY_NAME.getValue().equals(routeStrategy)) {
// route each individual line to each Relationship that matches. This one matches. // route each individual line to each Relationship that matches. This one matches.
final Relationship relationship = entry.getKey(); final Relationship relationship = entry.getKey();
appendLine(session, flowFileMap, relationship, originalFlowFile, line, charset);
final Group group = getGroup(matchLine, groupPattern);
appendLine(session, flowFileMap, relationship, originalFlowFile, line, charset, group);
continue; continue;
} }
@ -377,19 +426,32 @@ public class RouteText extends AbstractProcessor {
} }
if (relationship != null) { if (relationship != null) {
appendLine(session, flowFileMap, relationship, originalFlowFile, line, charset); final Group group = getGroup(matchLine, groupPattern);
appendLine(session, flowFileMap, relationship, originalFlowFile, line, charset, group);
} }
} }
} }
} }
}); });
for (final Map.Entry<Relationship, FlowFile> entry : flowFileMap.entrySet()) { for (final Map.Entry<Relationship, Map<Group, FlowFile>> entry : flowFileMap.entrySet()) {
logger.info("Created {} from {}; routing to relationship {}", new Object[] {entry.getValue(), originalFlowFile, entry.getKey()}); final Relationship relationship = entry.getKey();
FlowFile updatedFlowFile = session.putAttribute(entry.getValue(), ROUTE_ATTRIBUTE_KEY, entry.getKey().getName()); final Map<Group, FlowFile> groupToFlowFileMap = entry.getValue();
for (final Map.Entry<Group, FlowFile> flowFileEntry : groupToFlowFileMap.entrySet()) {
final Group group = flowFileEntry.getKey();
final FlowFile flowFile = flowFileEntry.getValue();
final Map<String, String> attributes = new HashMap<>(2);
attributes.put(ROUTE_ATTRIBUTE_KEY, relationship.getName());
attributes.put(GROUP_ATTRIBUTE_KEY, StringUtils.join(group.getCapturedValues(), ", "));
logger.info("Created {} from {}; routing to relationship {}", new Object[] {flowFile, originalFlowFile, relationship.getName()});
FlowFile updatedFlowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().route(updatedFlowFile, entry.getKey()); session.getProvenanceReporter().route(updatedFlowFile, entry.getKey());
session.transfer(updatedFlowFile, entry.getKey()); session.transfer(updatedFlowFile, entry.getKey());
} }
}
// now transfer the original flow file // now transfer the original flow file
FlowFile flowFile = originalFlowFile; FlowFile flowFile = originalFlowFile;
@ -400,9 +462,33 @@ public class RouteText extends AbstractProcessor {
} }
private void appendLine(final ProcessSession session, final Map<Relationship, FlowFile> flowFileMap, private Group getGroup(final String line, final Pattern groupPattern) {
final Relationship relationship, final FlowFile original, final String line, final Charset charset) { if (groupPattern == null) {
FlowFile flowFile = flowFileMap.get(relationship); return EMPTY_GROUP;
} else {
final Matcher matcher = groupPattern.matcher(line);
if (matcher.matches()) {
final List<String> capturingGroupValues = new ArrayList<>(matcher.groupCount());
for (int i = 1; i <= matcher.groupCount(); i++) {
capturingGroupValues.add(matcher.group(i));
}
return new Group(capturingGroupValues);
} else {
return EMPTY_GROUP;
}
}
}
private void appendLine(final ProcessSession session, final Map<Relationship, Map<Group, FlowFile>> flowFileMap, final Relationship relationship,
final FlowFile original, final String line, final Charset charset, final Group group) {
Map<Group, FlowFile> groupToFlowFileMap = flowFileMap.get(relationship);
if (groupToFlowFileMap == null) {
groupToFlowFileMap = new HashMap<>();
flowFileMap.put(relationship, groupToFlowFileMap);
}
FlowFile flowFile = groupToFlowFileMap.get(group);
if (flowFile == null) { if (flowFile == null) {
flowFile = session.create(original); flowFile = session.create(original);
} }
@ -414,7 +500,7 @@ public class RouteText extends AbstractProcessor {
} }
}); });
flowFileMap.put(relationship, flowFile); groupToFlowFileMap.put(group, flowFile);
} }
@ -436,4 +522,56 @@ public class RouteText extends AbstractProcessor {
return false; return false;
} }
private static class Group {
private final List<String> capturedValues;
public Group(final List<String> capturedValues) {
this.capturedValues = capturedValues;
}
public List<String> getCapturedValues() {
return capturedValues;
}
@Override
public String toString() {
return "Group" + capturedValues;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((capturedValues == null) ? 0 : capturedValues.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Group other = (Group) obj;
if (capturedValues == null) {
if (other.capturedValues != null) {
return false;
}
} else if (!capturedValues.equals(other.capturedValues)) {
return false;
}
return true;
}
}
} }

View File

@ -25,6 +25,7 @@ import java.io.IOException;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
@ -45,7 +46,7 @@ public class TestRouteText {
runner.run(); runner.run();
Set<Relationship> relationshipSet = runner.getProcessor().getRelationships(); Set<Relationship> relationshipSet = runner.getProcessor().getRelationships();
Set<String> expectedRelationships = new HashSet<>(Arrays.asList("matched", "unmatched")); Set<String> expectedRelationships = new HashSet<>(Arrays.asList("matched", "unmatched", "original"));
assertEquals(expectedRelationships.size(), relationshipSet.size()); assertEquals(expectedRelationships.size(), relationshipSet.size());
for (Relationship relationship : relationshipSet) { for (Relationship relationship : relationshipSet) {
@ -56,7 +57,7 @@ public class TestRouteText {
runner.setProperty(RouteText.ROUTE_STRATEGY, RouteText.ROUTE_TO_MATCHING_PROPERTY_NAME); runner.setProperty(RouteText.ROUTE_STRATEGY, RouteText.ROUTE_TO_MATCHING_PROPERTY_NAME);
relationshipSet = runner.getProcessor().getRelationships(); relationshipSet = runner.getProcessor().getRelationships();
expectedRelationships = new HashSet<>(Arrays.asList("simple", "unmatched")); expectedRelationships = new HashSet<>(Arrays.asList("simple", "unmatched", "original"));
assertEquals(expectedRelationships.size(), relationshipSet.size()); assertEquals(expectedRelationships.size(), relationshipSet.size());
for (Relationship relationship : relationshipSet) { for (Relationship relationship : relationshipSet) {
@ -81,7 +82,7 @@ public class TestRouteText {
runner.setProperty("simple", "start"); runner.setProperty("simple", "start");
Set<Relationship> relationshipSet = runner.getProcessor().getRelationships(); Set<Relationship> relationshipSet = runner.getProcessor().getRelationships();
Set<String> expectedRelationships = new HashSet<>(Arrays.asList("simple", "unmatched")); Set<String> expectedRelationships = new HashSet<>(Arrays.asList("simple", "unmatched", "original"));
assertEquals(expectedRelationships.size(), relationshipSet.size()); assertEquals(expectedRelationships.size(), relationshipSet.size());
for (Relationship relationship : relationshipSet) { for (Relationship relationship : relationshipSet) {
@ -174,6 +175,132 @@ public class TestRouteText {
runner.getFlowFilesForRelationship("original").get(0).assertContentEquals(originalText); runner.getFlowFilesForRelationship("original").get(0).assertContentEquals(originalText);
} }
@Test
public void testGroupSameRelationship() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS);
runner.setProperty(RouteText.GROUPING_REGEX, "(.*?),.*");
runner.setProperty("o", "o");
final String originalText = "1,hello\n2,world\n1,good-bye";
runner.enqueue(originalText.getBytes("UTF-8"));
runner.run();
runner.assertTransferCount("o", 2);
runner.assertTransferCount("unmatched", 0);
runner.assertTransferCount("original", 1);
final List<MockFlowFile> list = runner.getFlowFilesForRelationship("o");
boolean found1 = false;
boolean found2 = false;
for (final MockFlowFile mff : list) {
if (mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("1")) {
mff.assertContentEquals("1,hello\n1,good-bye");
found1 = true;
} else {
mff.assertAttributeEquals(RouteText.GROUP_ATTRIBUTE_KEY, "2");
mff.assertContentEquals("2,world\n");
found2 = true;
}
}
assertTrue(found1);
assertTrue(found2);
}
@Test
public void testMultipleGroupsSameRelationship() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS);
runner.setProperty(RouteText.GROUPING_REGEX, "(.*?),(.*?),.*");
runner.setProperty("o", "o");
final String originalText = "1,5,hello\n2,5,world\n1,8,good-bye\n1,5,overt";
runner.enqueue(originalText.getBytes("UTF-8"));
runner.run();
runner.assertTransferCount("o", 3);
runner.assertTransferCount("unmatched", 0);
runner.assertTransferCount("original", 1);
final List<MockFlowFile> list = runner.getFlowFilesForRelationship("o");
boolean found1 = false;
boolean found2 = false;
boolean found3 = false;
for (final MockFlowFile mff : list) {
if (mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("1, 5")) {
mff.assertContentEquals("1,5,hello\n1,5,overt");
found1 = true;
} else if (mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("2, 5")) {
mff.assertContentEquals("2,5,world\n");
found2 = true;
} else {
mff.assertAttributeEquals(RouteText.GROUP_ATTRIBUTE_KEY, "1, 8");
mff.assertContentEquals("1,8,good-bye\n");
found3 = true;
}
}
assertTrue(found1);
assertTrue(found2);
assertTrue(found3);
}
@Test
public void testGroupDifferentRelationships() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS);
runner.setProperty(RouteText.GROUPING_REGEX, "(.*?),.*");
runner.setProperty("l", "l");
final String originalText = "1,hello\n2,world\n1,good-bye\n3,ciao";
runner.enqueue(originalText.getBytes("UTF-8"));
runner.run();
runner.assertTransferCount("l", 2);
runner.assertTransferCount("unmatched", 2);
runner.assertTransferCount("original", 1);
List<MockFlowFile> lFlowFiles = runner.getFlowFilesForRelationship("l");
boolean found1 = false;
boolean found2 = false;
for (final MockFlowFile mff : lFlowFiles) {
if (mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("1")) {
mff.assertContentEquals("1,hello\n");
found1 = true;
} else {
mff.assertAttributeEquals(RouteText.GROUP_ATTRIBUTE_KEY, "2");
mff.assertContentEquals("2,world\n");
found2 = true;
}
}
assertTrue(found1);
assertTrue(found2);
List<MockFlowFile> unmatchedFlowFiles = runner.getFlowFilesForRelationship("unmatched");
found1 = false;
boolean found3 = false;
for (final MockFlowFile mff : unmatchedFlowFiles) {
if (mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("1")) {
mff.assertContentEquals("1,good-bye\n");
found1 = true;
} else {
mff.assertAttributeEquals(RouteText.GROUP_ATTRIBUTE_KEY, "3");
mff.assertContentEquals("3,ciao");
found3 = true;
}
}
assertTrue(found1);
assertTrue(found3);
}
@Test @Test
public void testSimpleDefaultContains() throws IOException { public void testSimpleDefaultContains() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new RouteText()); final TestRunner runner = TestRunners.newTestRunner(new RouteText());