NIFI-12674 Modified ValidateCSV to make the schema optional if a header is provided. Added validate on attribute option.

This closes #8362
Signed-off-by: Mike Moser <mosermw@apache.org>
This commit is contained in:
Freedom9339 2024-02-06 14:30:55 +00:00 committed by Mike Moser
parent 5c3499a008
commit 0190374e56
2 changed files with 247 additions and 127 deletions

View File

@ -36,8 +36,6 @@ import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.supercsv.cellprocessor.Optional;
import org.supercsv.cellprocessor.ParseBigDecimal;
@ -67,14 +65,16 @@ import org.supercsv.io.CsvListReader;
import org.supercsv.prefs.CsvPreference;
import org.supercsv.util.CsvContext;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@ -82,7 +82,7 @@ import java.util.concurrent.atomic.AtomicReference;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"csv", "schema", "validation"})
@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema. " +
@CapabilityDescription("Validates the contents of FlowFiles or a FlowFile attribute value against a user-specified CSV schema. " +
"Take a look at the additional documentation of this processor for some schema examples.")
@WritesAttributes({
@WritesAttribute(attribute = "count.valid.lines", description = "If line by line validation, number of valid lines extracted from the source data"),
@ -116,8 +116,8 @@ public class ValidateCsv extends AbstractProcessor {
.displayName("Schema")
.description("The schema to be used for validation. Is expected a comma-delimited string representing the cell "
+ "processors to apply. The following cell processors are allowed in the schema definition: "
+ ALLOWED_OPERATORS + ". Note: cell processors cannot be nested except with Optional.")
.required(true)
+ ALLOWED_OPERATORS + ". Note: cell processors cannot be nested except with Optional. Schema is required if Header is false.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
@ -172,6 +172,16 @@ public class ValidateCsv extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor CSV_SOURCE_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("CSV Source Attribute")
.displayName("CSV Source Attribute")
.description("The name of the attribute containing CSV data to be validated. If this property is blank, the FlowFile content will be validated.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
.dependsOn(VALIDATION_STRATEGY, VALIDATE_WHOLE_FLOWFILE.getValue())
.build();
public static final PropertyDescriptor INCLUDE_ALL_VIOLATIONS = new PropertyDescriptor.Builder()
.name("validate-csv-violations")
.displayName("Include all violations")
@ -187,6 +197,7 @@ public class ValidateCsv extends AbstractProcessor {
private static final List<PropertyDescriptor> PROPERTIES = List.of(
SCHEMA,
CSV_SOURCE_ATTRIBUTE,
HEADER,
DELIMITER_CHARACTER,
QUOTE_CHARACTER,
@ -201,7 +212,8 @@ public class ValidateCsv extends AbstractProcessor {
.build();
public static final Relationship REL_INVALID = new Relationship.Builder()
.name("invalid")
.description("FlowFiles that are not valid according to the specified schema are routed to this relationship")
.description("FlowFiles that are not valid according to the specified schema,"
+ " or no schema or CSV header can be identified, are routed to this relationship")
.build();
private static final Set<Relationship> RELATIONSHIPS = Set.of(
@ -223,6 +235,7 @@ public class ValidateCsv extends AbstractProcessor {
protected Collection<ValidationResult> customValidate(ValidationContext context) {
PropertyValue schemaProp = context.getProperty(SCHEMA);
PropertyValue headerProp = context.getProperty(HEADER);
String schema = schemaProp.getValue();
String subject = SCHEMA.getName();
@ -231,7 +244,11 @@ public class ValidateCsv extends AbstractProcessor {
}
// If no Expression Language is present, try parsing the schema
try {
this.parseSchema(schema);
if (schema != null) {
this.parseSchema(schema);
} else if (!headerProp.asBoolean()) {
throw(new Exception("Schema cannot be empty if Header property is false."));
}
} catch (Exception e) {
final List<ValidationResult> problems = new ArrayList<>(1);
problems.add(new ValidationResult.Builder().subject(subject)
@ -449,155 +466,154 @@ public class ValidateCsv extends AbstractProcessor {
final CsvPreference csvPref = getPreference(context, flowFile);
final boolean header = context.getProperty(HEADER).asBoolean();
final ComponentLog logger = getLogger();
final String schema = context.getProperty(SCHEMA).evaluateAttributeExpressions(flowFile).getValue();
final CellProcessor[] cellProcs = this.parseSchema(schema);
final boolean isWholeFFValidation = context.getProperty(VALIDATION_STRATEGY).getValue().equals(VALIDATE_WHOLE_FLOWFILE.getValue());
String schema = context.getProperty(SCHEMA).evaluateAttributeExpressions(flowFile).getValue();
CellProcessor[] cellProcs = null;
if (schema != null) {
cellProcs = this.parseSchema(schema);
}
final String validationStrategy = context.getProperty(VALIDATION_STRATEGY).getValue();
final boolean isWholeFFValidation = !validationStrategy.equals(VALIDATE_LINES_INDIVIDUALLY.getValue());
final boolean includeAllViolations = context.getProperty(INCLUDE_ALL_VIOLATIONS).asBoolean();
final AtomicReference<Boolean> valid = new AtomicReference<>(true);
boolean valid = true;
int okCount = 0;
int totalCount = 0;
FlowFile invalidFF = null;
FlowFile validFF = null;
String validationError = null;
final AtomicReference<Boolean> isFirstLineValid = new AtomicReference<>(true);
final AtomicReference<Boolean> isFirstLineInvalid = new AtomicReference<>(true);
final AtomicReference<Integer> okCount = new AtomicReference<>(0);
final AtomicReference<Integer> totalCount = new AtomicReference<>(0);
final AtomicReference<FlowFile> invalidFF = new AtomicReference<>(null);
final AtomicReference<FlowFile> validFF = new AtomicReference<>(null);
final AtomicReference<String> validationError = new AtomicReference<>(null);
if (!isWholeFFValidation) {
invalidFF.set(session.create(flowFile));
validFF.set(session.create(flowFile));
invalidFF = session.create(flowFile);
validFF = session.create(flowFile);
}
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final NifiCsvListReader listReader = new NifiCsvListReader(new InputStreamReader(in), csvPref)) {
InputStream stream;
if (context.getProperty(CSV_SOURCE_ATTRIBUTE).isSet() && isWholeFFValidation) {
String csvAttribute = flowFile.getAttribute(context.getProperty(CSV_SOURCE_ATTRIBUTE).evaluateAttributeExpressions().getValue());
stream = new ByteArrayInputStream(Objects.requireNonNullElse(csvAttribute, "").getBytes(StandardCharsets.UTF_8));
} else {
stream = session.read(flowFile);
}
// handling of header
if (header) {
stream: try (final NifiCsvListReader listReader = new NifiCsvListReader(new InputStreamReader(stream), csvPref)) {
// read header
listReader.read();
// handling of header
if (header) {
if (!isWholeFFValidation) {
invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(print(listReader.getUntokenizedRow(), csvPref, true));
}
}));
validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(print(listReader.getUntokenizedRow(), csvPref, true));
}
}));
isFirstLineValid.set(false);
isFirstLineInvalid.set(false);
}
// read header
List<String> headers = listReader.read();
if (schema == null) {
if (headers != null && !headers.isEmpty()) {
String newSchema = "Optional(StrNotNullOrEmpty()),".repeat(headers.size());
schema = newSchema.substring(0, newSchema.length() - 1);
cellProcs = this.parseSchema(schema);
} else {
validationError = "No schema or CSV header could be identified.";
valid = false;
break stream;
}
}
boolean stop = false;
while (!stop) {
try {
// read next row and check if no more row
stop = listReader.read(includeAllViolations && valid.get(), cellProcs) == null;
if (!isWholeFFValidation && !stop) {
validFF.set(session.append(validFF.get(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(print(listReader.getUntokenizedRow(), csvPref, isFirstLineValid.get()));
}
}));
okCount.set(okCount.get() + 1);
if (isFirstLineValid.get()) {
isFirstLineValid.set(false);
}
}
} catch (final SuperCsvException e) {
valid.set(false);
if (isWholeFFValidation) {
validationError.set(e.getLocalizedMessage());
logger.debug("Failed to validate {} against schema due to {}; routing to 'invalid'", flowFile, e);
break;
} else {
// we append the invalid line to the flow file that will be routed to invalid relationship
invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(print(listReader.getUntokenizedRow(), csvPref, isFirstLineInvalid.get()));
}
}));
if (isFirstLineInvalid.get()) {
isFirstLineInvalid.set(false);
}
if (validationError.get() == null) {
validationError.set(e.getLocalizedMessage());
}
}
} finally {
if (!isWholeFFValidation) {
totalCount.set(totalCount.get() + 1);
}
}
}
} catch (final IOException e) {
valid.set(false);
logger.error("Failed to validate {} against schema due to {}", flowFile, e);
if (!isWholeFFValidation) {
invalidFF = session.append(invalidFF, out -> out.write(print(listReader.getUntokenizedRow(), csvPref, true)));
validFF = session.append(validFF, out -> out.write(print(listReader.getUntokenizedRow(), csvPref, true)));
isFirstLineValid.set(false);
isFirstLineInvalid.set(false);
}
}
});
boolean stop = false;
while (!stop) {
try {
// read next row and check if no more row
stop = listReader.read(includeAllViolations && valid, cellProcs) == null;
if (!isWholeFFValidation && !stop) {
validFF = session.append(validFF, out -> out.write(print(listReader.getUntokenizedRow(), csvPref, isFirstLineValid.get())));
okCount++;
if (isFirstLineValid.get()) {
isFirstLineValid.set(false);
}
}
} catch (final SuperCsvException e) {
valid = false;
if (isWholeFFValidation) {
validationError = e.getLocalizedMessage();
logger.debug("Failed to validate {} against schema due to {}; routing to 'invalid'", flowFile, e);
break;
} else {
// we append the invalid line to the flow file that will be routed to invalid relationship
invalidFF = session.append(invalidFF, out -> out.write(print(listReader.getUntokenizedRow(), csvPref, isFirstLineInvalid.get())));
if (isFirstLineInvalid.get()) {
isFirstLineInvalid.set(false);
}
if (validationError == null) {
validationError = e.getLocalizedMessage();
}
}
} finally {
if (!isWholeFFValidation) {
totalCount++;
}
}
}
} catch (final IOException e) {
valid = false;
logger.error("Failed to validate {} against schema due to {}", flowFile, e);
}
if (isWholeFFValidation) {
if (valid.get()) {
if (valid) {
logger.debug("Successfully validated {} against schema; routing to 'valid'", flowFile);
session.getProvenanceReporter().route(flowFile, REL_VALID);
session.transfer(flowFile, REL_VALID);
} else {
session.getProvenanceReporter().route(flowFile, REL_INVALID);
session.putAttribute(flowFile, "validation.error.message", validationError.get());
session.putAttribute(flowFile, "validation.error.message", validationError);
session.transfer(flowFile, REL_INVALID);
}
} else {
if (valid.get()) {
logger.debug("Successfully validated {} against schema; routing to 'valid'", validFF.get());
session.getProvenanceReporter().route(validFF.get(), REL_VALID, "All " + totalCount.get() + " line(s) are valid");
session.putAttribute(validFF.get(), "count.valid.lines", Integer.toString(totalCount.get()));
session.putAttribute(validFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
session.transfer(validFF.get(), REL_VALID);
session.remove(invalidFF.get());
if (valid) {
logger.debug("Successfully validated {} against schema; routing to 'valid'", validFF);
session.getProvenanceReporter().route(validFF, REL_VALID, "All " + totalCount + " line(s) are valid");
session.putAttribute(validFF, "count.valid.lines", Integer.toString(totalCount));
session.putAttribute(validFF, "count.total.lines", Integer.toString(totalCount));
session.transfer(validFF, REL_VALID);
session.remove(invalidFF);
session.remove(flowFile);
} else if (okCount.get() != 0) {
} else if (okCount != 0) {
// because of the finally within the 'while' loop
totalCount.set(totalCount.get() - 1);
totalCount--;
logger.debug("Successfully validated {}/{} line(s) in {} against schema; routing valid lines to 'valid' and invalid lines to 'invalid'", okCount.get(), totalCount.get(), flowFile);
session.getProvenanceReporter().route(validFF.get(), REL_VALID, okCount.get() + " valid line(s)");
session.putAttribute(validFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
session.putAttribute(validFF.get(), "count.valid.lines", Integer.toString(okCount.get()));
session.transfer(validFF.get(), REL_VALID);
session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, (totalCount.get() - okCount.get()) + " invalid line(s)");
session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString((totalCount.get() - okCount.get())));
session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
session.putAttribute(invalidFF.get(), "validation.error.message", validationError.get());
session.transfer(invalidFF.get(), REL_INVALID);
logger.debug("Successfully validated {}/{} line(s) in {} against schema; routing valid lines to 'valid' and invalid lines to 'invalid'",
okCount, totalCount, flowFile);
session.getProvenanceReporter().route(validFF, REL_VALID, okCount + " valid line(s)");
session.putAttribute(validFF, "count.total.lines", Integer.toString(totalCount));
session.putAttribute(validFF, "count.valid.lines", Integer.toString(okCount));
session.transfer(validFF, REL_VALID);
session.getProvenanceReporter().route(invalidFF, REL_INVALID, (totalCount - okCount) + " invalid line(s)");
session.putAttribute(invalidFF, "count.invalid.lines", Integer.toString((totalCount - okCount)));
session.putAttribute(invalidFF, "count.total.lines", Integer.toString(totalCount));
session.putAttribute(invalidFF, "validation.error.message", validationError);
session.transfer(invalidFF, REL_INVALID);
session.remove(flowFile);
} else {
logger.debug("All lines in {} are invalid; routing to 'invalid'", invalidFF.get());
session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, "All " + totalCount.get() + " line(s) are invalid");
session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString(totalCount.get()));
session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
session.putAttribute(invalidFF.get(), "validation.error.message", validationError.get());
session.transfer(invalidFF.get(), REL_INVALID);
session.remove(validFF.get());
logger.debug("All lines in {} are invalid; routing to 'invalid'", invalidFF);
session.getProvenanceReporter().route(invalidFF, REL_INVALID, "All " + totalCount + " line(s) are invalid");
session.putAttribute(invalidFF, "count.invalid.lines", Integer.toString(totalCount));
session.putAttribute(invalidFF, "count.total.lines", Integer.toString(totalCount));
session.putAttribute(invalidFF, "validation.error.message", validationError);
session.transfer(invalidFF, REL_INVALID);
session.remove(validFF);
session.remove(flowFile);
}
}

View File

@ -16,10 +16,14 @@
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
public class TestValidateCsv {
@Test
@ -164,6 +168,106 @@ public class TestValidateCsv {
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
}
@Test
public void testNoSchema() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "true");
runner.enqueue("bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647");
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.clearTransferState();
runner.enqueue(new byte[0]);
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_INVALID, 1);
}
@Test
public void testValidateOnAttribute() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "true");
runner.setProperty(ValidateCsv.CSV_SOURCE_ATTRIBUTE, "CSV_ATTRIBUTE");
runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_WHOLE_FLOWFILE.getValue());
final Map<String, String> attributeMap = new HashMap<>();
attributeMap.put("CSV_ATTRIBUTE", "bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647");
runner.enqueue("FlowFile Random Data", attributeMap);
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).getFirst().assertContentEquals("FlowFile Random Data");
}
@Test
public void testValidateOnAttributeDoesNotExist() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "true");
runner.setProperty(ValidateCsv.CSV_SOURCE_ATTRIBUTE, "CSV_ATTRIBUTE");
runner.setProperty(ValidateCsv.SCHEMA, "ParseInt(),ParseInt(),ParseInt()");
runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_WHOLE_FLOWFILE.getValue());
final Map<String, String> attributeMap = new HashMap<>();
attributeMap.put("CSV_ATTRIBUTE_BAD", "bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647");
runner.enqueue("FlowFile Random Data", attributeMap);
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).getFirst().assertContentEquals("FlowFile Random Data");
runner.clearTransferState();
attributeMap.clear();
attributeMap.put("CSV_ATTRIBUTE", "");
runner.enqueue("FlowFile Random Data", attributeMap);
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).getFirst().assertContentEquals("FlowFile Random Data");
}
@Test
public void testValidateOnAttributeDoesNotExistNoSchema() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "true");
runner.setProperty(ValidateCsv.CSV_SOURCE_ATTRIBUTE, "CSV_ATTRIBUTE");
runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_WHOLE_FLOWFILE.getValue());
final Map<String, String> attributeMap = new HashMap<>();
attributeMap.put("CSV_ATTRIBUTE_BAD", "bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647");
runner.enqueue("FlowFile Random Data", attributeMap);
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_INVALID, 1);
MockFlowFile flowfile = runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).getFirst();
flowfile.assertAttributeEquals("validation.error.message",
"No schema or CSV header could be identified.");
flowfile.assertContentEquals("FlowFile Random Data");
}
@Test
public void testValidateEmptyFile() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());
runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ",");
runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n");
runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\"");
runner.setProperty(ValidateCsv.HEADER, "true");
runner.setProperty(ValidateCsv.SCHEMA, "ParseInt(),ParseInt(),ParseInt()");
runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_WHOLE_FLOWFILE.getValue());
final Map<String, String> attributeMap = new HashMap<>();
runner.enqueue(new byte[0], attributeMap);
runner.run();
runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1);
}
@Test
public void testEqualsNotNullStrNotNullOrEmpty() {
final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());