mirror of https://github.com/apache/nifi.git
Merge branch 'NIFI-1083'
This commit is contained in:
commit
11fcad90d1
|
@ -0,0 +1,577 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.io.Reader;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||
import org.apache.nifi.annotation.behavior.DynamicRelationship;
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.Validator;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.logging.ProcessorLog;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.standard.util.NLKBufferedReader;
|
||||
|
||||
|
||||
@EventDriven
|
||||
@SideEffectFree
|
||||
@SupportsBatching
|
||||
@Tags({"attributes", "routing", "text", "regexp", "regex", "Regular Expression", "Expression Language"})
|
||||
@CapabilityDescription("Routes textual data based on a set of user-defined rules. Each line in an incoming FlowFile is compared against the values specified by user-defined Properties. "
|
||||
+ "The mechanism by which the text is compared to these user-defined properties is defined by the 'Matching Strategy'. The data is then routed according to these rules, routing "
|
||||
+ "each line of the text individually.")
|
||||
@DynamicProperty(name = "Relationship Name", value = "value to match against", description = "Routes data that matches the value specified in the Dynamic Property Value to the "
|
||||
+ "Relationship specified in the Dynamic Property Key.")
|
||||
@DynamicRelationship(name = "Name from Dynamic Property", description = "FlowFiles that match the Dynamic Property's value")
|
||||
public class RouteText extends AbstractProcessor {
|
||||
|
||||
public static final String ROUTE_ATTRIBUTE_KEY = "RouteText.Route";
|
||||
public static final String GROUP_ATTRIBUTE_KEY = "RouteText.Group";
|
||||
|
||||
private static final String routeAllMatchValue = "Route to 'matched' if line matches all conditions";
|
||||
private static final String routeAnyMatchValue = "Route to 'matched' if lines matches any condition";
|
||||
private static final String routePropertyNameValue = "Route to each matching Property Name";
|
||||
|
||||
private static final String startsWithValue = "Starts With";
|
||||
private static final String endsWithValue = "Ends With";
|
||||
private static final String containsValue = "Contains";
|
||||
private static final String equalsValue = "Equals";
|
||||
private static final String matchesRegularExpressionValue = "Matches Regular Expression";
|
||||
private static final String containsRegularExpressionValue = "Contains Regular Expression";
|
||||
|
||||
|
||||
public static final AllowableValue ROUTE_TO_MATCHING_PROPERTY_NAME = new AllowableValue(routePropertyNameValue, routePropertyNameValue,
|
||||
"Lines will be routed to each relationship whose corresponding expression evaluates to 'true'");
|
||||
public static final AllowableValue ROUTE_TO_MATCHED_WHEN_ALL_PROPERTIES_MATCH = new AllowableValue(routeAllMatchValue, routeAllMatchValue,
|
||||
"Requires that all user-defined expressions evaluate to 'true' for the line to be considered a match");
|
||||
public static final AllowableValue ROUTE_TO_MATCHED_WHEN_ANY_PROPERTY_MATCHES = new AllowableValue(routeAnyMatchValue, routeAnyMatchValue,
|
||||
"Requires that at least one user-defined expression evaluate to 'true' for the line to be considered a match");
|
||||
|
||||
public static final AllowableValue STARTS_WITH = new AllowableValue(startsWithValue, startsWithValue,
|
||||
"Match lines based on whether the line starts with the property value");
|
||||
public static final AllowableValue ENDS_WITH = new AllowableValue(endsWithValue, endsWithValue,
|
||||
"Match lines based on whether the line ends with the property value");
|
||||
public static final AllowableValue CONTAINS = new AllowableValue(containsValue, containsValue,
|
||||
"Match lines based on whether the line contains the property value");
|
||||
public static final AllowableValue EQUALS = new AllowableValue(equalsValue, equalsValue,
|
||||
"Match lines based on whether the line equals the property value");
|
||||
public static final AllowableValue MATCHES_REGULAR_EXPRESSION = new AllowableValue(matchesRegularExpressionValue, matchesRegularExpressionValue,
|
||||
"Match lines based on whether the line exactly matches the Regular Expression that is provided as the Property value");
|
||||
public static final AllowableValue CONTAINS_REGULAR_EXPRESSION = new AllowableValue(containsRegularExpressionValue, containsRegularExpressionValue,
|
||||
"Match lines based on whether the line contains some text that matches the Regular Expression that is provided as the Property value");
|
||||
|
||||
|
||||
public static final PropertyDescriptor ROUTE_STRATEGY = new PropertyDescriptor.Builder()
|
||||
.name("Routing Strategy")
|
||||
.description("Specifies how to determine which Relationship(s) to use when evaluating the lines of incoming text against the 'Matching Strategy' and user-defined properties.")
|
||||
.required(true)
|
||||
.allowableValues(ROUTE_TO_MATCHING_PROPERTY_NAME, ROUTE_TO_MATCHED_WHEN_ALL_PROPERTIES_MATCH, ROUTE_TO_MATCHED_WHEN_ANY_PROPERTY_MATCHES)
|
||||
.defaultValue(ROUTE_TO_MATCHING_PROPERTY_NAME.getValue())
|
||||
.dynamic(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor MATCH_STRATEGY = new PropertyDescriptor.Builder()
|
||||
.name("Matching Strategy")
|
||||
.description("Specifies how to evaluate each line of incoming text against the user-defined properties.")
|
||||
.required(true)
|
||||
.allowableValues(STARTS_WITH, ENDS_WITH, CONTAINS, EQUALS, MATCHES_REGULAR_EXPRESSION, CONTAINS_REGULAR_EXPRESSION)
|
||||
.dynamic(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor TRIM_WHITESPACE = new PropertyDescriptor.Builder()
|
||||
.name("Ignore Leading/Trailing Whitespace")
|
||||
.description("Indicates whether or not the whitespace at the beginning and end of the lines should be ignored when evaluating the line.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.defaultValue("true")
|
||||
.dynamic(false)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor IGNORE_CASE = new PropertyDescriptor.Builder()
|
||||
.name("Ignore Case")
|
||||
.description("If true, capitalization will not be taken into account when comparing values. E.g., matching against 'HELLO' or 'hello' will have the same result.")
|
||||
.expressionLanguageSupported(false)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("false")
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor GROUPING_REGEX = new PropertyDescriptor.Builder()
|
||||
.name("Grouping Regular Expression")
|
||||
.description("Specifies a Regular Expression to evaluate against each line to determine which Group the line should be placed in. "
|
||||
+ "The Regular Expression must have at least one Capturing Group that defines the line's Group. If multiple Capturing Groups exist in the Regular Expression, the Group from all "
|
||||
+ "Capturing Groups. Two lines will not be placed into the same FlowFile unless the they both have the same value for the Group "
|
||||
+ "(or neither line matches the Regular Expression). For example, to group together all lines in a CSV File by the first column, we can set this value to \"(.*?),.*\". "
|
||||
+ "Two lines that have the same Group but different Relationships will never be placed into the same FlowFile.")
|
||||
.addValidator(StandardValidators.createRegexValidator(1, Integer.MAX_VALUE, false))
|
||||
.expressionLanguageSupported(false)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
|
||||
.name("Character Set")
|
||||
.description("The Character Set in which the incoming text is encoded")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
||||
.defaultValue("UTF-8")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
|
||||
.name("original")
|
||||
.description("The original input file will be routed to this destination when the lines have been successfully routed to 1 or more relationships")
|
||||
.build();
|
||||
public static final Relationship REL_NO_MATCH = new Relationship.Builder()
|
||||
.name("unmatched")
|
||||
.description("Data that does not satisfy the required user-defined rules will be routed to this Relationship")
|
||||
.build();
|
||||
public static final Relationship REL_MATCH = new Relationship.Builder()
|
||||
.name("matched")
|
||||
.description("Data that satisfies the required user-defined rules will be routed to this Relationship")
|
||||
.build();
|
||||
|
||||
private static Group EMPTY_GROUP = new Group(Collections.<String> emptyList());
|
||||
|
||||
private AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
|
||||
private List<PropertyDescriptor> properties;
|
||||
private volatile String configuredRouteStrategy = ROUTE_STRATEGY.getDefaultValue();
|
||||
private volatile Set<String> dynamicPropertyNames = new HashSet<>();
|
||||
|
||||
/**
|
||||
* Cache of dynamic properties set during {@link #onScheduled(ProcessContext)} for quick access in
|
||||
* {@link #onTrigger(ProcessContext, ProcessSession)}
|
||||
*/
|
||||
private volatile Map<Relationship, PropertyValue> propertyMap = new HashMap<>();
|
||||
private volatile Pattern groupingRegex = null;
|
||||
|
||||
@Override
|
||||
protected void init(final ProcessorInitializationContext context) {
|
||||
final Set<Relationship> set = new HashSet<>();
|
||||
set.add(REL_ORIGINAL);
|
||||
set.add(REL_NO_MATCH);
|
||||
relationships = new AtomicReference<>(set);
|
||||
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(ROUTE_STRATEGY);
|
||||
properties.add(MATCH_STRATEGY);
|
||||
properties.add(CHARACTER_SET);
|
||||
properties.add(TRIM_WHITESPACE);
|
||||
properties.add(IGNORE_CASE);
|
||||
properties.add(GROUPING_REGEX);
|
||||
this.properties = Collections.unmodifiableList(properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return relationships.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||
return new PropertyDescriptor.Builder()
|
||||
.required(false)
|
||||
.name(propertyDescriptorName)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.dynamic(true)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
||||
if (descriptor.equals(ROUTE_STRATEGY)) {
|
||||
configuredRouteStrategy = newValue;
|
||||
} else {
|
||||
final Set<String> newDynamicPropertyNames = new HashSet<>(dynamicPropertyNames);
|
||||
if (newValue == null) {
|
||||
newDynamicPropertyNames.remove(descriptor.getName());
|
||||
} else if (oldValue == null && descriptor.isDynamic()) { // new property
|
||||
newDynamicPropertyNames.add(descriptor.getName());
|
||||
}
|
||||
|
||||
this.dynamicPropertyNames = Collections.unmodifiableSet(newDynamicPropertyNames);
|
||||
}
|
||||
|
||||
// formulate the new set of Relationships
|
||||
final Set<String> allDynamicProps = this.dynamicPropertyNames;
|
||||
final Set<Relationship> newRelationships = new HashSet<>();
|
||||
final String routeStrategy = configuredRouteStrategy;
|
||||
if (ROUTE_TO_MATCHING_PROPERTY_NAME.equals(routeStrategy)) {
|
||||
for (final String propName : allDynamicProps) {
|
||||
newRelationships.add(new Relationship.Builder().name(propName).build());
|
||||
}
|
||||
} else {
|
||||
newRelationships.add(REL_MATCH);
|
||||
}
|
||||
|
||||
newRelationships.add(REL_ORIGINAL);
|
||||
newRelationships.add(REL_NO_MATCH);
|
||||
this.relationships.set(newRelationships);
|
||||
}
|
||||
|
||||
/**
|
||||
* When this processor is scheduled, update the dynamic properties into the map
|
||||
* for quick access during each onTrigger call
|
||||
*
|
||||
* @param context ProcessContext used to retrieve dynamic properties
|
||||
*/
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) {
|
||||
final String regex = context.getProperty(GROUPING_REGEX).getValue();
|
||||
if (regex != null) {
|
||||
groupingRegex = Pattern.compile(regex);
|
||||
}
|
||||
|
||||
final Map<Relationship, PropertyValue> newPropertyMap = new HashMap<>();
|
||||
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
|
||||
if (!descriptor.isDynamic()) {
|
||||
continue;
|
||||
}
|
||||
getLogger().debug("Adding new dynamic property: {}", new Object[] {descriptor});
|
||||
newPropertyMap.put(new Relationship.Builder().name(descriptor.getName()).build(), context.getProperty(descriptor));
|
||||
}
|
||||
|
||||
this.propertyMap = newPropertyMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||
Collection<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
|
||||
boolean dynamicProperty = false;
|
||||
|
||||
final String matchStrategy = validationContext.getProperty(MATCH_STRATEGY).getValue();
|
||||
final boolean compileRegex = matchStrategy.equals(matchesRegularExpressionValue) || matchStrategy.equals(containsRegularExpressionValue);
|
||||
Validator validator = null;
|
||||
if (compileRegex) {
|
||||
validator = StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true);
|
||||
}
|
||||
|
||||
Map<PropertyDescriptor, String> allProperties = validationContext.getProperties();
|
||||
for (final PropertyDescriptor descriptor : allProperties.keySet()) {
|
||||
if (descriptor.isDynamic()) {
|
||||
dynamicProperty = true;
|
||||
|
||||
if (compileRegex) {
|
||||
ValidationResult validationResult = validator.validate(descriptor.getName(), validationContext.getProperty(descriptor).getValue(), validationContext);
|
||||
if (validationResult != null) {
|
||||
results.add(validationResult);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!dynamicProperty) {
|
||||
results.add(new ValidationResult.Builder().subject("Dynamic Properties")
|
||||
.explanation("In order to route text there must be dynamic properties to match against").valid(false).build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) {
|
||||
final FlowFile originalFlowFile = session.get();
|
||||
if (originalFlowFile == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final ProcessorLog logger = getLogger();
|
||||
final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue());
|
||||
final boolean trim = context.getProperty(TRIM_WHITESPACE).asBoolean();
|
||||
final String routeStrategy = context.getProperty(ROUTE_STRATEGY).getValue();
|
||||
final String matchStrategy = context.getProperty(MATCH_STRATEGY).getValue();
|
||||
final boolean ignoreCase = context.getProperty(IGNORE_CASE).asBoolean();
|
||||
|
||||
final Map<Relationship, PropertyValue> propMap = this.propertyMap;
|
||||
final Map<Relationship, Object> propValueMap = new HashMap<>(propMap.size());
|
||||
|
||||
final boolean compileRegex = matchStrategy.equals(matchesRegularExpressionValue) || matchStrategy.equals(containsRegularExpressionValue);
|
||||
|
||||
for (final Map.Entry<Relationship, PropertyValue> entry : propMap.entrySet()) {
|
||||
final String value = entry.getValue().evaluateAttributeExpressions(originalFlowFile).getValue();
|
||||
|
||||
Pattern compiledRegex = null;
|
||||
if (compileRegex) {
|
||||
compiledRegex = ignoreCase ? Pattern.compile(value, Pattern.CASE_INSENSITIVE) : Pattern.compile(value);
|
||||
}
|
||||
propValueMap.put(entry.getKey(), compileRegex ? compiledRegex : value);
|
||||
}
|
||||
|
||||
final Map<Relationship, Map<Group, FlowFile>> flowFileMap = new HashMap<>();
|
||||
final Pattern groupPattern = groupingRegex;
|
||||
|
||||
session.read(originalFlowFile, new InputStreamCallback() {
|
||||
@Override
|
||||
public void process(final InputStream in) throws IOException {
|
||||
try (final Reader inReader = new InputStreamReader(in, charset);
|
||||
final NLKBufferedReader reader = new NLKBufferedReader(inReader)) {
|
||||
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
|
||||
final String matchLine;
|
||||
if (trim) {
|
||||
matchLine = line.trim();
|
||||
} else {
|
||||
// Always trim off the new-line and carriage return characters before evaluating the line.
|
||||
// The NLKBufferedReader maintains these characters so that when we write the line out we can maintain
|
||||
// these characters. However, we don't actually want to match against these characters.
|
||||
final String lineWithoutEndings;
|
||||
final int indexOfCR = line.indexOf("\r");
|
||||
final int indexOfNL = line.indexOf("\n");
|
||||
if (indexOfCR > 0 && indexOfNL > 0) {
|
||||
lineWithoutEndings = line.substring(0, Math.min(indexOfCR, indexOfNL));
|
||||
} else if (indexOfCR > 0) {
|
||||
lineWithoutEndings = line.substring(0, indexOfCR);
|
||||
} else if (indexOfNL > 0) {
|
||||
lineWithoutEndings = line.substring(0, indexOfNL);
|
||||
} else {
|
||||
lineWithoutEndings = line;
|
||||
}
|
||||
|
||||
matchLine = lineWithoutEndings;
|
||||
}
|
||||
|
||||
int propertiesThatMatchedLine = 0;
|
||||
for (final Map.Entry<Relationship, Object> entry : propValueMap.entrySet()) {
|
||||
boolean lineMatchesProperty = lineMatches(matchLine, entry.getValue(), context.getProperty(MATCH_STRATEGY).getValue(), ignoreCase);
|
||||
if (lineMatchesProperty) {
|
||||
propertiesThatMatchedLine++;
|
||||
}
|
||||
|
||||
if (lineMatchesProperty && ROUTE_TO_MATCHING_PROPERTY_NAME.getValue().equals(routeStrategy)) {
|
||||
// route each individual line to each Relationship that matches. This one matches.
|
||||
final Relationship relationship = entry.getKey();
|
||||
|
||||
final Group group = getGroup(matchLine, groupPattern);
|
||||
appendLine(session, flowFileMap, relationship, originalFlowFile, line, charset, group);
|
||||
continue;
|
||||
}
|
||||
|
||||
// break as soon as possible to avoid calculating things we don't need to calculate.
|
||||
if (lineMatchesProperty && ROUTE_TO_MATCHED_WHEN_ANY_PROPERTY_MATCHES.getValue().equals(routeStrategy)) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!lineMatchesProperty && ROUTE_TO_MATCHED_WHEN_ALL_PROPERTIES_MATCH.getValue().equals(routeStrategy)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
final Relationship relationship;
|
||||
if (ROUTE_TO_MATCHING_PROPERTY_NAME.getValue().equals(routeStrategy) && propertiesThatMatchedLine > 0) {
|
||||
// Set relationship to null so that we do not append the line to each FlowFile again. #appendLine is called
|
||||
// above within the loop, as the line may need to go to multiple different FlowFiles.
|
||||
relationship = null;
|
||||
} else if (ROUTE_TO_MATCHED_WHEN_ANY_PROPERTY_MATCHES.getValue().equals(routeStrategy) && propertiesThatMatchedLine > 0) {
|
||||
relationship = REL_MATCH;
|
||||
} else if (ROUTE_TO_MATCHED_WHEN_ALL_PROPERTIES_MATCH.getValue().equals(routeStrategy) && propertiesThatMatchedLine == propValueMap.size()) {
|
||||
relationship = REL_MATCH;
|
||||
} else {
|
||||
relationship = REL_NO_MATCH;
|
||||
}
|
||||
|
||||
if (relationship != null) {
|
||||
final Group group = getGroup(matchLine, groupPattern);
|
||||
appendLine(session, flowFileMap, relationship, originalFlowFile, line, charset, group);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
for (final Map.Entry<Relationship, Map<Group, FlowFile>> entry : flowFileMap.entrySet()) {
|
||||
final Relationship relationship = entry.getKey();
|
||||
final Map<Group, FlowFile> groupToFlowFileMap = entry.getValue();
|
||||
|
||||
for (final Map.Entry<Group, FlowFile> flowFileEntry : groupToFlowFileMap.entrySet()) {
|
||||
final Group group = flowFileEntry.getKey();
|
||||
final FlowFile flowFile = flowFileEntry.getValue();
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>(2);
|
||||
attributes.put(ROUTE_ATTRIBUTE_KEY, relationship.getName());
|
||||
attributes.put(GROUP_ATTRIBUTE_KEY, StringUtils.join(group.getCapturedValues(), ", "));
|
||||
|
||||
logger.info("Created {} from {}; routing to relationship {}", new Object[] {flowFile, originalFlowFile, relationship.getName()});
|
||||
FlowFile updatedFlowFile = session.putAllAttributes(flowFile, attributes);
|
||||
session.getProvenanceReporter().route(updatedFlowFile, entry.getKey());
|
||||
session.transfer(updatedFlowFile, entry.getKey());
|
||||
}
|
||||
}
|
||||
|
||||
// now transfer the original flow file
|
||||
FlowFile flowFile = originalFlowFile;
|
||||
logger.info("Routing {} to {}", new Object[] {flowFile, REL_ORIGINAL});
|
||||
session.getProvenanceReporter().route(originalFlowFile, REL_ORIGINAL);
|
||||
flowFile = session.putAttribute(flowFile, ROUTE_ATTRIBUTE_KEY, REL_ORIGINAL.getName());
|
||||
session.transfer(flowFile, REL_ORIGINAL);
|
||||
}
|
||||
|
||||
|
||||
private Group getGroup(final String line, final Pattern groupPattern) {
|
||||
if (groupPattern == null) {
|
||||
return EMPTY_GROUP;
|
||||
} else {
|
||||
final Matcher matcher = groupPattern.matcher(line);
|
||||
if (matcher.matches()) {
|
||||
final List<String> capturingGroupValues = new ArrayList<>(matcher.groupCount());
|
||||
for (int i = 1; i <= matcher.groupCount(); i++) {
|
||||
capturingGroupValues.add(matcher.group(i));
|
||||
}
|
||||
return new Group(capturingGroupValues);
|
||||
} else {
|
||||
return EMPTY_GROUP;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void appendLine(final ProcessSession session, final Map<Relationship, Map<Group, FlowFile>> flowFileMap, final Relationship relationship,
|
||||
final FlowFile original, final String line, final Charset charset, final Group group) {
|
||||
|
||||
Map<Group, FlowFile> groupToFlowFileMap = flowFileMap.get(relationship);
|
||||
if (groupToFlowFileMap == null) {
|
||||
groupToFlowFileMap = new HashMap<>();
|
||||
flowFileMap.put(relationship, groupToFlowFileMap);
|
||||
}
|
||||
|
||||
FlowFile flowFile = groupToFlowFileMap.get(group);
|
||||
if (flowFile == null) {
|
||||
flowFile = session.create(original);
|
||||
}
|
||||
|
||||
flowFile = session.append(flowFile, new OutputStreamCallback() {
|
||||
@Override
|
||||
public void process(final OutputStream out) throws IOException {
|
||||
out.write(line.getBytes(charset));
|
||||
}
|
||||
});
|
||||
|
||||
groupToFlowFileMap.put(group, flowFile);
|
||||
}
|
||||
|
||||
|
||||
protected static boolean lineMatches(final String line, final Object comparison, final String matchingStrategy, final boolean ignoreCase) {
|
||||
switch (matchingStrategy) {
|
||||
case startsWithValue:
|
||||
return line.toLowerCase().startsWith(((String) comparison).toLowerCase());
|
||||
case endsWithValue:
|
||||
return line.toLowerCase().endsWith(((String) comparison).toLowerCase());
|
||||
case containsValue:
|
||||
return line.toLowerCase().contains(((String) comparison).toLowerCase());
|
||||
case equalsValue:
|
||||
return line.equalsIgnoreCase((String) comparison);
|
||||
case matchesRegularExpressionValue:
|
||||
return ((Pattern) comparison).matcher(line).matches();
|
||||
case containsRegularExpressionValue:
|
||||
return ((Pattern) comparison).matcher(line).find();
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
private static class Group {
|
||||
private final List<String> capturedValues;
|
||||
|
||||
public Group(final List<String> capturedValues) {
|
||||
this.capturedValues = capturedValues;
|
||||
}
|
||||
|
||||
public List<String> getCapturedValues() {
|
||||
return capturedValues;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Group" + capturedValues;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + ((capturedValues == null) ? 0 : capturedValues.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
Group other = (Group) obj;
|
||||
if (capturedValues == null) {
|
||||
if (other.capturedValues != null) {
|
||||
return false;
|
||||
}
|
||||
} else if (!capturedValues.equals(other.capturedValues)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -95,7 +95,7 @@ public class NLKBufferedReader extends BufferedReader {
|
|||
for (i = nextChar; i < nChars; i++) {
|
||||
c = cb[i];
|
||||
if ((c == '\n') || (c == '\r')) {
|
||||
if (cb[i + 1] == '\n') { // windows case '\r\n' here verify the next character i+1
|
||||
if ((c == '\r') && (cb.length > i + 1) && cb[i + 1] == '\n') { // windows case '\r\n' here verify the next character i+1
|
||||
i++;
|
||||
}
|
||||
eol = true;
|
||||
|
|
|
@ -61,6 +61,7 @@ org.apache.nifi.processors.standard.PutSFTP
|
|||
org.apache.nifi.processors.standard.PutSQL
|
||||
org.apache.nifi.processors.standard.PutSyslog
|
||||
org.apache.nifi.processors.standard.ReplaceText
|
||||
org.apache.nifi.processors.standard.RouteText
|
||||
org.apache.nifi.processors.standard.ReplaceTextWithMapping
|
||||
org.apache.nifi.processors.standard.RouteOnAttribute
|
||||
org.apache.nifi.processors.standard.RouteOnContent
|
||||
|
|
|
@ -0,0 +1,717 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestRouteText {
|
||||
|
||||
@Test
|
||||
public void testRelationships() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.STARTS_WITH);
|
||||
runner.setProperty(RouteText.ROUTE_STRATEGY, RouteText.ROUTE_TO_MATCHED_WHEN_ANY_PROPERTY_MATCHES);
|
||||
runner.setProperty("simple", "start");
|
||||
|
||||
runner.run();
|
||||
|
||||
Set<Relationship> relationshipSet = runner.getProcessor().getRelationships();
|
||||
Set<String> expectedRelationships = new HashSet<>(Arrays.asList("matched", "unmatched", "original"));
|
||||
|
||||
assertEquals(expectedRelationships.size(), relationshipSet.size());
|
||||
for (Relationship relationship : relationshipSet) {
|
||||
assertTrue(expectedRelationships.contains(relationship.getName()));
|
||||
}
|
||||
|
||||
|
||||
runner.setProperty(RouteText.ROUTE_STRATEGY, RouteText.ROUTE_TO_MATCHING_PROPERTY_NAME);
|
||||
|
||||
relationshipSet = runner.getProcessor().getRelationships();
|
||||
expectedRelationships = new HashSet<>(Arrays.asList("simple", "unmatched", "original"));
|
||||
|
||||
assertEquals(expectedRelationships.size(), relationshipSet.size());
|
||||
for (Relationship relationship : relationshipSet) {
|
||||
assertTrue(expectedRelationships.contains(relationship.getName()));
|
||||
}
|
||||
|
||||
runner.run();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeparationStrategyNotKnown() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.STARTS_WITH);
|
||||
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotText() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.STARTS_WITH);
|
||||
runner.setProperty("simple", "start");
|
||||
|
||||
Set<Relationship> relationshipSet = runner.getProcessor().getRelationships();
|
||||
Set<String> expectedRelationships = new HashSet<>(Arrays.asList("simple", "unmatched", "original"));
|
||||
|
||||
assertEquals(expectedRelationships.size(), relationshipSet.size());
|
||||
for (Relationship relationship : relationshipSet) {
|
||||
assertTrue(expectedRelationships.contains(relationship.getName()));
|
||||
}
|
||||
|
||||
runner.enqueue(Paths.get("src/test/resources/simple.jpg"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outOriginal = runner.getFlowFilesForRelationship("original").get(0);
|
||||
outOriginal.assertContentEquals(Paths.get("src/test/resources/simple.jpg"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidRegex() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.MATCHES_REGULAR_EXPRESSION);
|
||||
runner.setProperty("simple", "[");
|
||||
|
||||
runner.enqueue("start middle end\nnot match".getBytes("UTF-8"));
|
||||
try {
|
||||
runner.run();
|
||||
fail();
|
||||
} catch (AssertionError e) {
|
||||
// Expect to catch error asserting 'simple' as invalid
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleDefaultStarts() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.STARTS_WITH);
|
||||
runner.setProperty("simple", "start");
|
||||
|
||||
runner.enqueue("start middle end\nnot match".getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("simple", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("simple").get(0);
|
||||
outMatched.assertContentEquals("start middle end\n".getBytes("UTF-8"));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleDefaultEnd() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.ENDS_WITH);
|
||||
runner.setProperty("simple", "end");
|
||||
|
||||
|
||||
runner.enqueue("start middle end\nnot match".getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("simple", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("simple").get(0);
|
||||
outMatched.assertContentEquals("start middle end\n".getBytes("UTF-8"));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRouteLineToMultipleRelationships() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS);
|
||||
runner.setProperty("t", "t");
|
||||
runner.setProperty("e", "e");
|
||||
runner.setProperty("z", "z");
|
||||
|
||||
final String originalText = "start middle end\nnot match";
|
||||
runner.enqueue(originalText.getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("t", 1);
|
||||
runner.assertTransferCount("e", 1);
|
||||
runner.assertTransferCount("z", 0);
|
||||
runner.assertTransferCount("unmatched", 0);
|
||||
runner.assertTransferCount("original", 1);
|
||||
|
||||
runner.getFlowFilesForRelationship("t").get(0).assertContentEquals(originalText);
|
||||
runner.getFlowFilesForRelationship("e").get(0).assertContentEquals("start middle end\n");
|
||||
runner.getFlowFilesForRelationship("z").isEmpty();
|
||||
runner.getFlowFilesForRelationship("original").get(0).assertContentEquals(originalText);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupSameRelationship() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS);
|
||||
runner.setProperty(RouteText.GROUPING_REGEX, "(.*?),.*");
|
||||
runner.setProperty("o", "o");
|
||||
|
||||
final String originalText = "1,hello\n2,world\n1,good-bye";
|
||||
runner.enqueue(originalText.getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("o", 2);
|
||||
runner.assertTransferCount("unmatched", 0);
|
||||
runner.assertTransferCount("original", 1);
|
||||
|
||||
final List<MockFlowFile> list = runner.getFlowFilesForRelationship("o");
|
||||
|
||||
boolean found1 = false;
|
||||
boolean found2 = false;
|
||||
|
||||
for (final MockFlowFile mff : list) {
|
||||
if (mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("1")) {
|
||||
mff.assertContentEquals("1,hello\n1,good-bye");
|
||||
found1 = true;
|
||||
} else {
|
||||
mff.assertAttributeEquals(RouteText.GROUP_ATTRIBUTE_KEY, "2");
|
||||
mff.assertContentEquals("2,world\n");
|
||||
found2 = true;
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue(found1);
|
||||
assertTrue(found2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleGroupsSameRelationship() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS);
|
||||
runner.setProperty(RouteText.GROUPING_REGEX, "(.*?),(.*?),.*");
|
||||
runner.setProperty("o", "o");
|
||||
|
||||
final String originalText = "1,5,hello\n2,5,world\n1,8,good-bye\n1,5,overt";
|
||||
runner.enqueue(originalText.getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("o", 3);
|
||||
runner.assertTransferCount("unmatched", 0);
|
||||
runner.assertTransferCount("original", 1);
|
||||
|
||||
final List<MockFlowFile> list = runner.getFlowFilesForRelationship("o");
|
||||
|
||||
boolean found1 = false;
|
||||
boolean found2 = false;
|
||||
boolean found3 = false;
|
||||
|
||||
for (final MockFlowFile mff : list) {
|
||||
if (mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("1, 5")) {
|
||||
mff.assertContentEquals("1,5,hello\n1,5,overt");
|
||||
found1 = true;
|
||||
} else if (mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("2, 5")) {
|
||||
mff.assertContentEquals("2,5,world\n");
|
||||
found2 = true;
|
||||
} else {
|
||||
mff.assertAttributeEquals(RouteText.GROUP_ATTRIBUTE_KEY, "1, 8");
|
||||
mff.assertContentEquals("1,8,good-bye\n");
|
||||
found3 = true;
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue(found1);
|
||||
assertTrue(found2);
|
||||
assertTrue(found3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupDifferentRelationships() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS);
|
||||
runner.setProperty(RouteText.GROUPING_REGEX, "(.*?),.*");
|
||||
runner.setProperty("l", "l");
|
||||
|
||||
final String originalText = "1,hello\n2,world\n1,good-bye\n3,ciao";
|
||||
runner.enqueue(originalText.getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("l", 2);
|
||||
runner.assertTransferCount("unmatched", 2);
|
||||
runner.assertTransferCount("original", 1);
|
||||
|
||||
List<MockFlowFile> lFlowFiles = runner.getFlowFilesForRelationship("l");
|
||||
boolean found1 = false;
|
||||
boolean found2 = false;
|
||||
for (final MockFlowFile mff : lFlowFiles) {
|
||||
if (mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("1")) {
|
||||
mff.assertContentEquals("1,hello\n");
|
||||
found1 = true;
|
||||
} else {
|
||||
mff.assertAttributeEquals(RouteText.GROUP_ATTRIBUTE_KEY, "2");
|
||||
mff.assertContentEquals("2,world\n");
|
||||
found2 = true;
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue(found1);
|
||||
assertTrue(found2);
|
||||
|
||||
List<MockFlowFile> unmatchedFlowFiles = runner.getFlowFilesForRelationship("unmatched");
|
||||
found1 = false;
|
||||
boolean found3 = false;
|
||||
for (final MockFlowFile mff : unmatchedFlowFiles) {
|
||||
if (mff.getAttribute(RouteText.GROUP_ATTRIBUTE_KEY).equals("1")) {
|
||||
mff.assertContentEquals("1,good-bye\n");
|
||||
found1 = true;
|
||||
} else {
|
||||
mff.assertAttributeEquals(RouteText.GROUP_ATTRIBUTE_KEY, "3");
|
||||
mff.assertContentEquals("3,ciao");
|
||||
found3 = true;
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue(found1);
|
||||
assertTrue(found3);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleDefaultContains() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS);
|
||||
runner.setProperty("simple", "middle");
|
||||
|
||||
runner.enqueue("start middle end\nnot match".getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("simple", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("simple").get(0);
|
||||
outMatched.assertContentEquals("start middle end\n".getBytes("UTF-8"));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleContainsIgnoreCase() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS);
|
||||
runner.setProperty(RouteText.IGNORE_CASE, "true");
|
||||
runner.setProperty("simple", "miDDlE");
|
||||
|
||||
runner.enqueue("start middle end\nnot match".getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("simple", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("simple").get(0);
|
||||
outMatched.assertContentEquals("start middle end\n".getBytes("UTF-8"));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSimpleDefaultEquals() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.EQUALS);
|
||||
runner.setProperty("simple", "start middle end");
|
||||
|
||||
runner.enqueue("start middle end\nnot match".getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("simple", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("simple").get(0);
|
||||
outMatched.assertContentEquals("start middle end\n".getBytes("UTF-8"));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleDefaultMatchRegularExpression() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.MATCHES_REGULAR_EXPRESSION);
|
||||
runner.setProperty("simple", ".*(mid).*");
|
||||
|
||||
runner.enqueue("start middle end\nnot match".getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("simple", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("simple").get(0);
|
||||
outMatched.assertContentEquals("start middle end\n".getBytes("UTF-8"));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleDefaultContainRegularExpression() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS_REGULAR_EXPRESSION);
|
||||
runner.setProperty("simple", "(m.d)");
|
||||
|
||||
runner.enqueue("start middle end\nnot match".getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("simple", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("simple").get(0);
|
||||
outMatched.assertContentEquals("start middle end\n".getBytes("UTF-8"));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------ */
|
||||
|
||||
@Test
|
||||
public void testSimpleAnyStarts() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.STARTS_WITH);
|
||||
runner.setProperty(RouteText.ROUTE_STRATEGY, RouteText.ROUTE_TO_MATCHED_WHEN_ANY_PROPERTY_MATCHES);
|
||||
runner.setProperty("simple", "start");
|
||||
runner.setProperty("no", "no match");
|
||||
|
||||
runner.enqueue("start middle end\nnot match".getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("matched", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("matched").get(0);
|
||||
outMatched.assertContentEquals("start middle end\n".getBytes("UTF-8"));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleAnyEnds() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.ENDS_WITH);
|
||||
runner.setProperty(RouteText.ROUTE_STRATEGY, RouteText.ROUTE_TO_MATCHED_WHEN_ANY_PROPERTY_MATCHES);
|
||||
runner.setProperty("simple", "end");
|
||||
runner.setProperty("no", "no match");
|
||||
|
||||
runner.enqueue("start middle end\nnot match".getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("matched", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("matched").get(0);
|
||||
outMatched.assertContentEquals("start middle end\n".getBytes("UTF-8"));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleAnyEquals() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.EQUALS);
|
||||
runner.setProperty(RouteText.ROUTE_STRATEGY, RouteText.ROUTE_TO_MATCHED_WHEN_ANY_PROPERTY_MATCHES);
|
||||
runner.setProperty("simple", "start middle end");
|
||||
runner.setProperty("no", "no match");
|
||||
|
||||
runner.enqueue("start middle end\nnot match".getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("matched", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("matched").get(0);
|
||||
outMatched.assertContentEquals("start middle end\n".getBytes("UTF-8"));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleAnyMatchRegularExpression() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.MATCHES_REGULAR_EXPRESSION);
|
||||
runner.setProperty(RouteText.ROUTE_STRATEGY, RouteText.ROUTE_TO_MATCHED_WHEN_ANY_PROPERTY_MATCHES);
|
||||
runner.setProperty("simple", ".*(m.d).*");
|
||||
runner.setProperty("no", "no match");
|
||||
|
||||
runner.enqueue("start middle end\nnot match".getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("matched", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("matched").get(0);
|
||||
outMatched.assertContentEquals("start middle end\n".getBytes("UTF-8"));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleAnyContainRegularExpression() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS_REGULAR_EXPRESSION);
|
||||
runner.setProperty(RouteText.ROUTE_STRATEGY, RouteText.ROUTE_TO_MATCHED_WHEN_ANY_PROPERTY_MATCHES);
|
||||
runner.setProperty("simple", "(m.d)");
|
||||
runner.setProperty("no", "no match");
|
||||
|
||||
runner.enqueue("start middle end\nnot match".getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("matched", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("matched").get(0);
|
||||
outMatched.assertContentEquals("start middle end\n".getBytes("UTF-8"));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------ */
|
||||
|
||||
@Test
|
||||
public void testSimpleAllStarts() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.STARTS_WITH);
|
||||
runner.setProperty(RouteText.ROUTE_STRATEGY, RouteText.ROUTE_TO_MATCHED_WHEN_ALL_PROPERTIES_MATCH);
|
||||
runner.setProperty("simple", "start middle");
|
||||
runner.setProperty("second", "star");
|
||||
|
||||
runner.enqueue("start middle end\nnot match".getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("matched", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("matched").get(0);
|
||||
outMatched.assertContentEquals("start middle end\n".getBytes("UTF-8"));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleAllEnds() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.ENDS_WITH);
|
||||
runner.setProperty(RouteText.ROUTE_STRATEGY, RouteText.ROUTE_TO_MATCHED_WHEN_ALL_PROPERTIES_MATCH);
|
||||
runner.setProperty("simple", "middle end");
|
||||
runner.setProperty("second", "nd");
|
||||
|
||||
runner.enqueue("start middle end\nnot match".getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("matched", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("matched").get(0);
|
||||
outMatched.assertContentEquals("start middle end\n".getBytes("UTF-8"));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleAllEquals() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.EQUALS);
|
||||
runner.setProperty(RouteText.ROUTE_STRATEGY, RouteText.ROUTE_TO_MATCHED_WHEN_ALL_PROPERTIES_MATCH);
|
||||
runner.setProperty("simple", "start middle end");
|
||||
runner.setProperty("second", "start middle end");
|
||||
|
||||
runner.enqueue("start middle end\nnot match".getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("matched", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("matched").get(0);
|
||||
outMatched.assertContentEquals("start middle end\n".getBytes("UTF-8"));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleAllMatchRegularExpression() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.MATCHES_REGULAR_EXPRESSION);
|
||||
runner.setProperty(RouteText.ROUTE_STRATEGY, RouteText.ROUTE_TO_MATCHED_WHEN_ALL_PROPERTIES_MATCH);
|
||||
runner.setProperty("simple", ".*(m.d).*");
|
||||
runner.setProperty("second", ".*(t.*m).*");
|
||||
|
||||
runner.enqueue("start middle end\nnot match".getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("matched", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("matched").get(0);
|
||||
outMatched.assertContentEquals("start middle end\n".getBytes("UTF-8"));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleAllContainRegularExpression() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS_REGULAR_EXPRESSION);
|
||||
runner.setProperty(RouteText.ROUTE_STRATEGY, RouteText.ROUTE_TO_MATCHED_WHEN_ALL_PROPERTIES_MATCH);
|
||||
runner.setProperty("simple", "(m.d)");
|
||||
runner.setProperty("second", "(t.*m)");
|
||||
|
||||
runner.enqueue("start middle end\nnot match".getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("matched", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("matched").get(0);
|
||||
outMatched.assertContentEquals("start middle end\n".getBytes("UTF-8"));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRouteOnPropertiesStartsWindowsNewLine() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.STARTS_WITH);
|
||||
runner.setProperty("simple", "start");
|
||||
|
||||
runner.enqueue("start middle end\r\nnot match".getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("simple", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("simple").get(0);
|
||||
outMatched.assertContentEquals("start middle end\r\n".getBytes("UTF-8"));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRouteOnPropertiesStartsJustCarriageReturn() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.STARTS_WITH);
|
||||
runner.setProperty("simple", "start");
|
||||
|
||||
runner.enqueue("start middle end\rnot match".getBytes("UTF-8"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("simple", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
final MockFlowFile outMatched = runner.getFlowFilesForRelationship("simple").get(0);
|
||||
outMatched.assertContentEquals("start middle end\r".getBytes("UTF-8"));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
outUnmatched.assertContentEquals("not match".getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJson() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.STARTS_WITH);
|
||||
runner.setProperty(RouteText.ROUTE_STRATEGY, RouteText.ROUTE_TO_MATCHING_PROPERTY_NAME);
|
||||
runner.setProperty("greeting", "\"greeting\"");
|
||||
runner.setProperty("address", "\"address\"");
|
||||
|
||||
runner.enqueue(Paths.get("src/test/resources/TestJson/json-sample.json"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("greeting", 1);
|
||||
runner.assertTransferCount("address", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
|
||||
// Verify text is trimmed
|
||||
final MockFlowFile outGreeting = runner.getFlowFilesForRelationship("greeting").get(0);
|
||||
String outGreetingString = new String(runner.getContentAsByteArray(outGreeting));
|
||||
assertEquals(7, countLines(outGreetingString));
|
||||
final MockFlowFile outAddress = runner.getFlowFilesForRelationship("address").get(0);
|
||||
String outAddressString = new String(runner.getContentAsByteArray(outAddress));
|
||||
assertEquals(7, countLines(outAddressString));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
String outUnmatchedString = new String(runner.getContentAsByteArray(outUnmatched));
|
||||
assertEquals(400, countLines(outUnmatchedString));
|
||||
|
||||
final MockFlowFile outOriginal = runner.getFlowFilesForRelationship("original").get(0);
|
||||
outOriginal.assertContentEquals(Paths.get("src/test/resources/TestJson/json-sample.json"));
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testXml() throws IOException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new RouteText());
|
||||
runner.setProperty(RouteText.MATCH_STRATEGY, RouteText.CONTAINS);
|
||||
runner.setProperty(RouteText.ROUTE_STRATEGY, RouteText.ROUTE_TO_MATCHING_PROPERTY_NAME);
|
||||
runner.setProperty("NodeType", "name=\"NodeType\"");
|
||||
runner.setProperty("element", "<xs:element");
|
||||
runner.setProperty("name", "name=");
|
||||
|
||||
runner.enqueue(Paths.get("src/test/resources/TestXml/XmlBundle.xsd"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount("NodeType", 1);
|
||||
runner.assertTransferCount("element", 1);
|
||||
runner.assertTransferCount("name", 1);
|
||||
runner.assertTransferCount("unmatched", 1);
|
||||
runner.assertTransferCount("original", 1);
|
||||
|
||||
|
||||
// Verify text is trimmed
|
||||
final MockFlowFile outNode = runner.getFlowFilesForRelationship("NodeType").get(0);
|
||||
String outNodeString = new String(runner.getContentAsByteArray(outNode));
|
||||
assertEquals(1, countLines(outNodeString));
|
||||
final MockFlowFile outElement = runner.getFlowFilesForRelationship("element").get(0);
|
||||
String outElementString = new String(runner.getContentAsByteArray(outElement));
|
||||
assertEquals(4, countLines(outElementString));
|
||||
final MockFlowFile outName = runner.getFlowFilesForRelationship("name").get(0);
|
||||
String outNameString = new String(runner.getContentAsByteArray(outName));
|
||||
assertEquals(7, countLines(outNameString));
|
||||
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship("unmatched").get(0);
|
||||
String outUnmatchedString = new String(runner.getContentAsByteArray(outUnmatched));
|
||||
assertEquals(26, countLines(outUnmatchedString));
|
||||
|
||||
final MockFlowFile outOriginal = runner.getFlowFilesForRelationship("original").get(0);
|
||||
outOriginal.assertContentEquals(Paths.get("src/test/resources/TestXml/XmlBundle.xsd"));
|
||||
}
|
||||
|
||||
public static int countLines(String str) {
|
||||
if (str == null || str.isEmpty()) {
|
||||
return 0;
|
||||
}
|
||||
int lines = 0;
|
||||
int pos = 0;
|
||||
while ((pos = str.indexOf(System.lineSeparator(), pos) + 1) != 0) {
|
||||
lines++;
|
||||
}
|
||||
return lines;
|
||||
}
|
||||
}
|
Binary file not shown.
After Width: | Height: | Size: 25 KiB |
Loading…
Reference in New Issue