NIFI-1942 Processor to validate CSV against user-supplied schema

This closes #476

Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
Pierre Villard 2016-05-27 10:05:16 +02:00 committed by jpercivall
parent 022f5a506d
commit d838f61291
5 changed files with 988 additions and 0 deletions

View File

@ -244,6 +244,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-standard-utils</artifactId> <artifactId>nifi-standard-utils</artifactId>
<version>1.1.0-SNAPSHOT</version> <version>1.1.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>net.sf.supercsv</groupId>
<artifactId>super-csv</artifactId>
<version>2.4.0</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -0,0 +1,618 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.supercsv.cellprocessor.Optional;
import org.supercsv.cellprocessor.ParseBigDecimal;
import org.supercsv.cellprocessor.ParseBool;
import org.supercsv.cellprocessor.ParseChar;
import org.supercsv.cellprocessor.ParseDate;
import org.supercsv.cellprocessor.ParseDouble;
import org.supercsv.cellprocessor.ParseInt;
import org.supercsv.cellprocessor.ParseLong;
import org.supercsv.cellprocessor.constraint.DMinMax;
import org.supercsv.cellprocessor.constraint.Equals;
import org.supercsv.cellprocessor.constraint.ForbidSubStr;
import org.supercsv.cellprocessor.constraint.IsIncludedIn;
import org.supercsv.cellprocessor.constraint.LMinMax;
import org.supercsv.cellprocessor.constraint.NotNull;
import org.supercsv.cellprocessor.constraint.RequireHashCode;
import org.supercsv.cellprocessor.constraint.RequireSubStr;
import org.supercsv.cellprocessor.constraint.StrMinMax;
import org.supercsv.cellprocessor.constraint.StrNotNullOrEmpty;
import org.supercsv.cellprocessor.constraint.StrRegEx;
import org.supercsv.cellprocessor.constraint.Strlen;
import org.supercsv.cellprocessor.constraint.Unique;
import org.supercsv.cellprocessor.constraint.UniqueHashCode;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.exception.SuperCsvException;
import org.supercsv.io.CsvListReader;
import org.supercsv.prefs.CsvPreference;
@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"csv", "schema", "validation"})
@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema. " +
"Take a look at the additional documentation of this processor for some schema examples.")
public class ValidateCsv extends AbstractProcessor {
private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
"ParseDouble", "ParseInt", "ParseLong", "Optional", "DMinMax", "Equals", "ForbidSubStr", "LMinMax", "NotNull", "Null",
"RequireHashCode", "RequireSubStr", "Strlen", "StrMinMax", "StrNotNullOrEmpty", "StrRegEx", "Unique",
"UniqueHashCode", "IsIncludedIn");
private static final String routeWholeFlowFile = "FlowFile validation";
private static final String routeLinesIndividually = "Line by line validation";
public static final AllowableValue VALIDATE_WHOLE_FLOWFILE = new AllowableValue(routeWholeFlowFile, routeWholeFlowFile,
"As soon as an error is found in the CSV file, the validation will stop and the whole flow file will be routed to the 'invalid'"
+ " relationship. This option offers best performances.");
public static final AllowableValue VALIDATE_LINES_INDIVIDUALLY = new AllowableValue(routeLinesIndividually, routeLinesIndividually,
"In case an error is found, the input CSV file will be split into two FlowFiles: one routed to the 'valid' "
+ "relationship containing all the correct lines and one routed to the 'invalid' relationship containing all "
+ "the incorrect lines. Take care if choosing this option while using Unique cell processors in schema definition:"
+ "the first occurrence will be considered valid and the next ones as invalid.");
public static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
.name("validate-csv-schema")
.displayName("Schema")
.description("The schema to be used for validation. Is expected a comma-delimited string representing the cell "
+ "processors to apply. The following cell processors are allowed in the schema definition: "
+ allowedOperators.toString() + ". Note: cell processors cannot be nested except with Optional.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor HEADER = new PropertyDescriptor.Builder()
.name("validate-csv-header")
.displayName("Header")
.description("True if the incoming flow file contains a header to ignore, false otherwise.")
.required(true)
.defaultValue("true")
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor QUOTE_CHARACTER = new PropertyDescriptor.Builder()
.name("validate-csv-quote")
.displayName("Quote character")
.description("Character used as 'quote' in the incoming data. Example: \"")
.required(true)
.defaultValue("\"")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor DELIMITER_CHARACTER = new PropertyDescriptor.Builder()
.name("validate-csv-delimiter")
.displayName("Delimiter character")
.description("Character used as 'delimiter' in the incoming data. Example: ,")
.required(true)
.defaultValue(",")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor END_OF_LINE_CHARACTER = new PropertyDescriptor.Builder()
.name("validate-csv-eol")
.displayName("End of line symbols")
.description("Symbols used as 'end of line' in the incoming data. Example: \\n")
.required(true)
.defaultValue("\\n")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor VALIDATION_STRATEGY = new PropertyDescriptor.Builder()
.name("validate-csv-strategy")
.displayName("Validation strategy")
.description("Strategy to apply when routing input files to output relationships.")
.required(true)
.defaultValue(VALIDATE_WHOLE_FLOWFILE.getValue())
.allowableValues(VALIDATE_LINES_INDIVIDUALLY, VALIDATE_WHOLE_FLOWFILE)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_VALID = new Relationship.Builder()
.name("valid")
.description("FlowFiles that are successfully validated against the schema are routed to this relationship")
.build();
public static final Relationship REL_INVALID = new Relationship.Builder()
.name("invalid")
.description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
.build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private final AtomicReference<CellProcessor[]> processors = new AtomicReference<CellProcessor[]>();
private final AtomicReference<CsvPreference> preference = new AtomicReference<CsvPreference>();
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(SCHEMA);
properties.add(HEADER);
properties.add(DELIMITER_CHARACTER);
properties.add(QUOTE_CHARACTER);
properties.add(END_OF_LINE_CHARACTER);
properties.add(VALIDATION_STRATEGY);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_VALID);
relationships.add(REL_INVALID);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
String schema = validationContext.getProperty(SCHEMA).getValue();
try {
this.parseSchema(validationContext.getProperty(SCHEMA).getValue());
} catch (Exception e) {
final List<ValidationResult> problems = new ArrayList<>(1);
problems.add(new ValidationResult.Builder().subject(SCHEMA.getName())
.input(schema)
.valid(false)
.explanation("Error while parsing the schema: " + e.getMessage())
.build());
return problems;
}
return super.customValidate(validationContext);
}
@OnScheduled
public void setPreference(final ProcessContext context) {
// When going from the UI to Java, the characters are escaped so that what you
// input is transferred over to Java as is. So when you type the characters "\"
// and "n" into the UI the Java string will end up being those two characters
// not the interpreted value "\n".
final String msgDemarcator = context.getProperty(END_OF_LINE_CHARACTER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
this.preference.set(new CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).getValue().charAt(0),
context.getProperty(DELIMITER_CHARACTER).getValue().charAt(0), msgDemarcator).build());
}
/**
* Method used to parse the string supplied by the user. The string is converted
* to a list of cell processors used to validate the CSV data.
* @param schema Schema to parse
*/
private void parseSchema(String schema) {
List<CellProcessor> processorsList = new ArrayList<CellProcessor>();
String remaining = schema;
while(remaining.length() > 0) {
remaining = setProcessor(remaining, processorsList);
}
this.processors.set(processorsList.toArray(new CellProcessor[processorsList.size()]));
}
private String setProcessor(String remaining, List<CellProcessor> processorsList) {
StringBuffer buffer = new StringBuffer();
int i = 0;
int opening = 0;
int closing = 0;
while(buffer.length() != remaining.length()) {
char c = remaining.charAt(i);
i++;
if(opening == 0 && c == ',') {
if(i == 1) {
continue;
}
break;
}
buffer.append(c);
if(c == '(') {
opening++;
} else if(c == ')') {
closing++;
}
if(opening > 0 && opening == closing) {
break;
}
}
final String procString = buffer.toString().trim();
opening = procString.indexOf('(');
String method = procString;
String argument = null;
if(opening != -1) {
argument = method.substring(opening + 1, method.length() - 1);
method = method.substring(0, opening);
}
processorsList.add(getProcessor(method.toLowerCase(), argument));
return remaining.substring(i);
}
private CellProcessor getProcessor(String method, String argument) {
switch (method) {
case "optional":
int opening = argument.indexOf('(');
String subMethod = argument;
String subArgument = null;
if(opening != -1) {
subArgument = subMethod.substring(opening + 1, subMethod.length() - 1);
subMethod = subMethod.substring(0, opening);
}
return new Optional(getProcessor(subMethod.toLowerCase(), subArgument));
case "parsedate":
return new ParseDate(argument.substring(1, argument.length() - 1));
case "parsedouble":
if(argument != null && !argument.isEmpty())
throw new IllegalArgumentException("ParseDouble does not expect any argument but has " + argument);
return new ParseDouble();
case "parsebigdecimal":
if(argument != null && !argument.isEmpty())
throw new IllegalArgumentException("ParseBigDecimal does not expect any argument but has " + argument);
return new ParseBigDecimal();
case "parsebool":
if(argument != null && !argument.isEmpty())
throw new IllegalArgumentException("ParseBool does not expect any argument but has " + argument);
return new ParseBool();
case "parsechar":
if(argument != null && !argument.isEmpty())
throw new IllegalArgumentException("ParseChar does not expect any argument but has " + argument);
return new ParseChar();
case "parseint":
if(argument != null && !argument.isEmpty())
throw new IllegalArgumentException("ParseInt does not expect any argument but has " + argument);
return new ParseInt();
case "parselong":
if(argument != null && !argument.isEmpty())
throw new IllegalArgumentException("ParseLong does not expect any argument but has " + argument);
return new ParseLong();
case "notnull":
if(argument != null && !argument.isEmpty())
throw new IllegalArgumentException("NotNull does not expect any argument but has " + argument);
return new NotNull();
case "strregex":
return new StrRegEx(argument.substring(1, argument.length() - 1));
case "unique":
if(argument != null && !argument.isEmpty())
throw new IllegalArgumentException("Unique does not expect any argument but has " + argument);
return new Unique();
case "uniquehashcode":
if(argument != null && !argument.isEmpty())
throw new IllegalArgumentException("UniqueHashCode does not expect any argument but has " + argument);
return new UniqueHashCode();
case "strlen":
String[] splts = argument.split(",");
int[] requiredLengths = new int[splts.length];
for(int i = 0; i < splts.length; i++) {
requiredLengths[i] = Integer.parseInt(splts[i]);
}
return new Strlen(requiredLengths);
case "strminmax":
String[] splits = argument.split(",");
return new StrMinMax(Long.parseLong(splits[0]), Long.parseLong(splits[1]));
case "lminmax":
String[] args = argument.split(",");
return new LMinMax(Long.parseLong(args[0]), Long.parseLong(args[1]));
case "dminmax":
String[] doubles = argument.split(",");
return new DMinMax(Double.parseDouble(doubles[0]), Double.parseDouble(doubles[1]));
case "equals":
if(argument != null && !argument.isEmpty())
throw new IllegalArgumentException("Equals does not expect any argument but has " + argument);
return new Equals();
case "forbidsubstr":
String[] forbiddenSubStrings = argument.replaceAll("\"", "").split(",[ ]*");
return new ForbidSubStr(forbiddenSubStrings);
case "requiresubstr":
String[] requiredSubStrings = argument.replaceAll("\"", "").split(",[ ]*");
return new RequireSubStr(requiredSubStrings);
case "strnotnullorempty":
if(argument != null && !argument.isEmpty())
throw new IllegalArgumentException("StrNotNullOrEmpty does not expect any argument but has " + argument);
return new StrNotNullOrEmpty();
case "requirehashcode":
String[] hashs = argument.split(",");
int[] hashcodes = new int[hashs.length];
for(int i = 0; i < hashs.length; i++) {
hashcodes[i] = Integer.parseInt(hashs[i]);
}
return new RequireHashCode(hashcodes);
case "null":
if(argument != null && !argument.isEmpty())
throw new IllegalArgumentException("Null does not expect any argument but has " + argument);
return null;
case "isincludedin":
String[] elements = argument.replaceAll("\"", "").split(",[ ]*");
return new IsIncludedIn(elements);
default:
throw new IllegalArgumentException("[" + method + "] is not an allowed method to define a Cell Processor");
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final CsvPreference csvPref = this.preference.get();
final boolean header = context.getProperty(HEADER).asBoolean();
final ComponentLog logger = getLogger();
final CellProcessor[] cellProcs = this.processors.get();
final boolean isWholeFFValidation = context.getProperty(VALIDATION_STRATEGY).getValue().equals(VALIDATE_WHOLE_FLOWFILE.getValue());
final AtomicReference<Boolean> valid = new AtomicReference<Boolean>(true);
final AtomicReference<Boolean> isFirstLineValid = new AtomicReference<Boolean>(true);
final AtomicReference<Boolean> isFirstLineInvalid = new AtomicReference<Boolean>(true);
final AtomicReference<Integer> okCount = new AtomicReference<Integer>(0);
final AtomicReference<Integer> totalCount = new AtomicReference<Integer>(0);
final AtomicReference<FlowFile> invalidFF = new AtomicReference<FlowFile>(null);
final AtomicReference<FlowFile> validFF = new AtomicReference<FlowFile>(null);
if(!isWholeFFValidation) {
invalidFF.set(session.create(flowFile));
validFF.set(session.create(flowFile));
}
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
NifiCsvListReader listReader = null;
try {
listReader = new NifiCsvListReader(new InputStreamReader(in), csvPref);
// handling of header
if(header) {
List<String> headerList = listReader.read();
if(!isWholeFFValidation) {
invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(print(headerList, csvPref, isFirstLineInvalid.get()));
}
}));
validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(print(headerList, csvPref, isFirstLineValid.get()));
}
}));
isFirstLineValid.set(false);
isFirstLineInvalid.set(false);
}
}
boolean stop = false;
while (!stop) {
try {
final List<Object> list = listReader.read(cellProcs);
stop = list == null;
if(!isWholeFFValidation && !stop) {
validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(print(list, csvPref, isFirstLineValid.get()));
}
}));
okCount.set(okCount.get() + 1);
if(isFirstLineValid.get()) {
isFirstLineValid.set(false);
}
}
} catch (final SuperCsvException e) {
valid.set(false);
if(isWholeFFValidation) {
logger.debug("Failed to validate {} against schema due to {}; routing to 'invalid'", new Object[]{flowFile}, e);
break;
} else {
// we append the invalid line to the flow file that will be routed to invalid relationship
invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(print(e.getCsvContext().getRowSource(), csvPref, isFirstLineInvalid.get()));
}
}));
if(isFirstLineInvalid.get()) {
isFirstLineInvalid.set(false);
}
}
} finally {
if(!isWholeFFValidation) {
totalCount.set(totalCount.get() + 1);
}
}
}
} catch (final IOException e) {
valid.set(false);
logger.error("Failed to validate {} against schema due to {}", new Object[]{flowFile}, e);
} finally {
if(listReader != null) {
listReader.close();
}
}
}
});
if(isWholeFFValidation) {
if (valid.get()) {
logger.debug("Successfully validated {} against schema; routing to 'valid'", new Object[]{flowFile});
session.getProvenanceReporter().route(flowFile, REL_VALID);
session.transfer(flowFile, REL_VALID);
} else {
session.getProvenanceReporter().route(flowFile, REL_INVALID);
session.transfer(flowFile, REL_INVALID);
}
} else {
if (valid.get()) {
logger.debug("Successfully validated {} against schema; routing to 'valid'", new Object[]{validFF.get()});
session.getProvenanceReporter().route(validFF.get(), REL_VALID, "All " + totalCount.get() + " line(s) are valid");
session.transfer(validFF.get(), REL_VALID);
session.remove(invalidFF.get());
session.remove(flowFile);
} else if (okCount.get() != 0) {
// because of the finally within the 'while' loop
totalCount.set(totalCount.get() - 1);
logger.debug("Successfully validated {}/{} line(s) in {} against schema; routing valid lines to 'valid' and invalid lines to 'invalid'",
new Object[]{okCount.get(), totalCount.get(), flowFile});
session.getProvenanceReporter().route(validFF.get(), REL_VALID, okCount.get() + " valid line(s)");
session.transfer(validFF.get(), REL_VALID);
session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, (totalCount.get() - okCount.get()) + " invalid line(s)");
session.transfer(invalidFF.get(), REL_INVALID);
session.remove(flowFile);
} else {
logger.debug("All lines in {} are invalid; routing to 'invalid'", new Object[]{invalidFF.get()});
session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, "All " + totalCount.get() + " line(s) are invalid");
session.transfer(invalidFF.get(), REL_INVALID);
session.remove(validFF.get());
session.remove(flowFile);
}
}
}
/**
* Method used to correctly write the lines by taking into account end of line
* character and separator character.
* @param list list of elements of the current row
* @param csvPref CSV preferences
* @param isFirstLine true if this is the first line we append
* @return String to append in the flow file
*/
private byte[] print(List<?> list, CsvPreference csvPref, boolean isFirstLine) {
StringBuffer buffer = new StringBuffer();
if (!isFirstLine) {
buffer.append(csvPref.getEndOfLineSymbols());
}
final int size = list.size();
for(int i = 0; i < size; i++) {
buffer.append(list.get(i).toString());
if(i != size - 1) {
buffer.append((char) csvPref.getDelimiterChar());
}
}
return buffer.toString().getBytes();
}
/**
* This is required to avoid the side effect of Parse* cell processors. If not overriding
* this method, parsing will return objects and writing objects could result in a different
* output in comparison to the input.
*/
private class NifiCsvListReader extends CsvListReader {
public NifiCsvListReader(Reader reader, CsvPreference preferences) {
super(reader, preferences);
}
@Override
public List<Object> read(CellProcessor... processors) throws IOException {
if( processors == null ) {
throw new NullPointerException("Processors should not be null");
}
if( readRow() ) {
super.executeProcessors(new ArrayList<Object>(getColumns().size()), processors);
return new ArrayList<Object>(getColumns());
}
return null; // EOF
}
}
}

View File

@ -87,5 +87,6 @@ org.apache.nifi.processors.standard.TailFile
org.apache.nifi.processors.standard.TransformXml org.apache.nifi.processors.standard.TransformXml
org.apache.nifi.processors.standard.UnpackContent org.apache.nifi.processors.standard.UnpackContent
org.apache.nifi.processors.standard.ValidateXml org.apache.nifi.processors.standard.ValidateXml
org.apache.nifi.processors.standard.ValidateCsv
org.apache.nifi.processors.standard.ExecuteSQL org.apache.nifi.processors.standard.ExecuteSQL
org.apache.nifi.processors.standard.FetchDistributedMapCache org.apache.nifi.processors.standard.FetchDistributedMapCache

View File

@ -0,0 +1,109 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>ValidateCsv</title>
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css"/>
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Usage Information</h2>
<p>
The Validate CSV processor is based on the super-csv library and the concept of
<a href="http://super-csv.github.io/super-csv/cell_processors.html" target="_blank">Cell Processors</a>.
The corresponding java documentation can be found
<a href="http://super-csv.github.io/super-csv/apidocs/org/supercsv/cellprocessor/ift/CellProcessor.html" target="_blank">here</a>.
</p>
<p>
The cell processors cannot be nested (except with Optional which gives the possibility to define a CellProcessor for values
that could be null) and must be defined in a comma-delimited string as the Schema property.
</p>
<p>
The supported cell processors are:
<ul>
<li>ParseBigDecimal</li>
<li>ParseBool</li>
<li>ParseChar</li>
<li>ParseDate</li>
<li>ParseDouble</li>
<li>ParseInt</li>
<li>Optional</li>
<li>DMinMax</li>
<li>Equals</li>
<li>ForbidSubStr</li>
<li>LMinMax</li>
<li>NotNull</li>
<li>Null</li>
<li>RequireHashCode</li>
<li>RequireSubStr</li>
<li>Strlen</li>
<li>StrMinMax</li>
<li>StrNotNullOrEmpty</li>
<li>StrRegEx</li>
<li>Unique</li>
<li>UniqueHashCode</li>
<li>IsIncludedIn</li>
</ul>
</p>
<p>
Here are some examples:
<ul>
<b>Schema property:</b> Null, ParseDate("dd/MM/yyyy"), Optional(ParseDouble())<br />
<b>Meaning:</b> the input CSV has three columns, the first one can be null and has no specification, the second one must be a date
formatted as expected, and the third one must a double or null (no value).
</ul>
<ul>
<b>Schema property:</b> ParseBigDecimal(), ParseBool(), ParseChar(), ParseInt(), ParseLong()<br />
<b>Meaning:</b> the input CSV has five columns, the first one must be a big decimal, the second one must be a boolean,
the third one must be a char, the fourth one must be an integer and the fifth one must be a long.
</ul>
<ul>
<b>Schema property:</b> Equals(), NotNull(), StrNotNullOrEmpty()<br />
<b>Meaning:</b> the input CSV has three columns, all the values of the first column must be equal to each other, all the values
of the second column must be not null, and all the values of the third column are not null/empty string values.
</ul>
<ul>
<b>Schema property:</b> Strlen(4), StrMinMax(3,5), StrRegex("[a-z0-9\\._]+@[a-z0-9\\.]+")<br />
<b>Meaning:</b> the input CSV has three columns, all the values of the first column must be 4-characters long, all the values
of the second column must be between 3 and 5 characters (inclusive), and all the values of the last column must match
the provided regular expression (email address).
</ul>
<ul>
<b>Schema property:</b> Unique(), UniqueHashCode()<br />
<b>Meaning:</b> the input CSV has two columns. All the values of the first column must be unique (all the values are stored in
memory and this can be consuming depending of the input). All the values of the second column must be unique (only hash
codes of the input values are stored to ensure uniqueness).
</ul>
<ul>
<b>Schema property:</b> ForbidSubStr("test", "tset"), RequireSubStr("test")<br />
<b>Meaning:</b> the input CSV has two columns. None of the values in the first column must contain one of the provided strings.
And all the values of the second column must contain the provided string.
</ul>
</p>
</body>
</html>

View File

@ -0,0 +1,255 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
public class TestValidateCsv {
@Test
public void testHeaderAndSplit() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "true");
runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
runner.setProperty(ValidateCsv.SCHEMA, "Null, ParseDate(\"dd/MM/yyyy\"), Optional(ParseDouble())");
runner.enqueue("Name,Birthdate,Weight\nJohn,22/11/1954,63.2\nBob,01/03/2004,45.0");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("Name,Birthdate,Weight\nJohn,22/11/1954,63.2\nBob,01/03/2004,45.0");
runner.assertTransferCount(ValidateCsv.REL_INVALID, 0);
runner.clearTransferState();
runner.enqueue("Name,Birthdate,Weight\nJohn,22/11/1954,63a2\nBob,01/032004,45.0");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_VALID, 0);
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertContentEquals("Name,Birthdate,Weight\nJohn,22/11/1954,63a2\nBob,01/032004,45.0");
runner.clearTransferState();
runner.enqueue("Name,Birthdate,Weight\nJohn,22/111954,63.2\nBob,01/03/2004,45.0");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("Name,Birthdate,Weight\nBob,01/03/2004,45.0");
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertContentEquals("Name,Birthdate,Weight\nJohn,22/111954,63.2");
}
@Test
public void testUniqueWithSplit() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_LINES_INDIVIDUALLY);
runner.setProperty(ValidateCsv.SCHEMA, "Unique()");
runner.enqueue("John\r\nBob\r\nBob\r\nJohn");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("John\r\nBob");
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertContentEquals("Bob\r\nJohn");
}
@Test
public void testValidDateOptionalDouble() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
runner.setProperty(ValidateCsv.SCHEMA, "Null, ParseDate(\"dd/MM/yyyy\"), Optional(ParseDouble())");
runner.enqueue("John,22/11/1954,63.2\r\nBob,01/03/2004,45.0");
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.enqueue("John,22/111954,63.2\r\nBob,01/03/2004,45.0");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
}
@Test
public void testIsIncludedIn() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
runner.setProperty(ValidateCsv.SCHEMA, "Null, ParseDate(\"dd/MM/yyyy\"), IsIncludedIn(\"male\", \"female\")");
runner.enqueue("John,22/11/1954,male\r\nMarie,01/03/2004,female");
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.enqueue("John,22/111954,63.2\r\nBob,01/03/2004,45.0");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
}
@Test
public void testBigDecimalBoolCharIntLong() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "true");
runner.setProperty(ValidateCsv.SCHEMA, "ParseBigDecimal(), ParseBool(), ParseChar(), ParseInt(), ParseLong()");
runner.enqueue("bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647");
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.enqueue("bigdecimal,bool,char,integer,long\r\n10.0001,true,c,92147483647,92147483647");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
}
@Test
public void testEqualsNotNullStrNotNullOrEmpty() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
runner.setProperty(ValidateCsv.SCHEMA, "Equals(), NotNull(), StrNotNullOrEmpty()");
runner.enqueue("test,test,test\r\ntest,test,test");
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.enqueue("test,test,test\r\ntset,test,test");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
}
@Test
public void testStrlenStrMinMaxStrRegex() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
runner.setProperty(ValidateCsv.SCHEMA, "Strlen(4), StrMinMax(3,5), StrRegex(\"[a-z0-9\\._]+@[a-z0-9\\.]+\")");
runner.enqueue("test,test,test@apache.org");
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.enqueue("test,test,testapache.org");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
}
@Test
public void testDMinMaxForbidSubStrLMinMax() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
runner.setProperty(ValidateCsv.SCHEMA, "DMinMax(10,100),LMinMax(10,100),ForbidSubStr(\"test\", \"tset\")");
runner.enqueue("50.001,50,hello");
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.enqueue("10,10,testapache.org");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
}
@Test
public void testUnique() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
runner.setProperty(ValidateCsv.SCHEMA, "Unique(), UniqueHashCode()");
runner.enqueue("1,2\r\n3,4");
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.enqueue("1,2\r\n1,4");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
}
@Test
public void testRequire() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
int hashcode = "test".hashCode();
runner.setProperty(ValidateCsv.SCHEMA, "RequireHashCode(" + hashcode + "), RequireSubStr(\"test\")");
runner.enqueue("test,test");
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.enqueue("tset,tset");
runner.run();
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
}
@Test
public void testValidate() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "false");
runner.setProperty(ValidateCsv.SCHEMA, "RequireSubString(\"test\")");
runner.assertNotValid();
runner.setProperty(ValidateCsv.SCHEMA, "''");
runner.assertNotValid();
runner.setProperty(ValidateCsv.SCHEMA, "\"\"");
runner.assertNotValid();
}
}