diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java index fdcfac3ead..8077905357 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java @@ -36,8 +36,6 @@ import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.supercsv.cellprocessor.Optional; import org.supercsv.cellprocessor.ParseBigDecimal; @@ -67,14 +65,16 @@ import org.supercsv.io.CsvListReader; import org.supercsv.prefs.CsvPreference; import org.supercsv.util.CsvContext; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.io.OutputStream; import java.io.Reader; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -82,7 +82,7 @@ import java.util.concurrent.atomic.AtomicReference; @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"csv", "schema", "validation"}) -@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema. " + +@CapabilityDescription("Validates the contents of FlowFiles or a FlowFile attribute value against a user-specified CSV schema. " + "Take a look at the additional documentation of this processor for some schema examples.") @WritesAttributes({ @WritesAttribute(attribute = "count.valid.lines", description = "If line by line validation, number of valid lines extracted from the source data"), @@ -116,8 +116,8 @@ public class ValidateCsv extends AbstractProcessor { .displayName("Schema") .description("The schema to be used for validation. Is expected a comma-delimited string representing the cell " + "processors to apply. The following cell processors are allowed in the schema definition: " - + ALLOWED_OPERATORS + ". Note: cell processors cannot be nested except with Optional.") - .required(true) + + ALLOWED_OPERATORS + ". Note: cell processors cannot be nested except with Optional. Schema is required if Header is false.") + .required(false) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) .build(); @@ -172,6 +172,16 @@ public class ValidateCsv extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor CSV_SOURCE_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("CSV Source Attribute") + .displayName("CSV Source Attribute") + .description("The name of the attribute containing CSV data to be validated. If this property is blank, the FlowFile content will be validated.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR) + .dependsOn(VALIDATION_STRATEGY, VALIDATE_WHOLE_FLOWFILE.getValue()) + .build(); + public static final PropertyDescriptor INCLUDE_ALL_VIOLATIONS = new PropertyDescriptor.Builder() .name("validate-csv-violations") .displayName("Include all violations") @@ -187,6 +197,7 @@ public class ValidateCsv extends AbstractProcessor { private static final List PROPERTIES = List.of( SCHEMA, + CSV_SOURCE_ATTRIBUTE, HEADER, DELIMITER_CHARACTER, QUOTE_CHARACTER, @@ -201,7 +212,8 @@ public class ValidateCsv extends AbstractProcessor { .build(); public static final Relationship REL_INVALID = new Relationship.Builder() .name("invalid") - .description("FlowFiles that are not valid according to the specified schema are routed to this relationship") + .description("FlowFiles that are not valid according to the specified schema," + + " or no schema or CSV header can be identified, are routed to this relationship") .build(); private static final Set RELATIONSHIPS = Set.of( @@ -223,6 +235,7 @@ public class ValidateCsv extends AbstractProcessor { protected Collection customValidate(ValidationContext context) { PropertyValue schemaProp = context.getProperty(SCHEMA); + PropertyValue headerProp = context.getProperty(HEADER); String schema = schemaProp.getValue(); String subject = SCHEMA.getName(); @@ -231,7 +244,11 @@ public class ValidateCsv extends AbstractProcessor { } // If no Expression Language is present, try parsing the schema try { - this.parseSchema(schema); + if (schema != null) { + this.parseSchema(schema); + } else if (!headerProp.asBoolean()) { + throw(new Exception("Schema cannot be empty if Header property is false.")); + } } catch (Exception e) { final List problems = new ArrayList<>(1); problems.add(new ValidationResult.Builder().subject(subject) @@ -449,155 +466,154 @@ public class ValidateCsv extends AbstractProcessor { final CsvPreference csvPref = getPreference(context, flowFile); final boolean header = context.getProperty(HEADER).asBoolean(); final ComponentLog logger = getLogger(); - final String schema = context.getProperty(SCHEMA).evaluateAttributeExpressions(flowFile).getValue(); - final CellProcessor[] cellProcs = this.parseSchema(schema); - final boolean isWholeFFValidation = context.getProperty(VALIDATION_STRATEGY).getValue().equals(VALIDATE_WHOLE_FLOWFILE.getValue()); + String schema = context.getProperty(SCHEMA).evaluateAttributeExpressions(flowFile).getValue(); + CellProcessor[] cellProcs = null; + if (schema != null) { + cellProcs = this.parseSchema(schema); + } + final String validationStrategy = context.getProperty(VALIDATION_STRATEGY).getValue(); + final boolean isWholeFFValidation = !validationStrategy.equals(VALIDATE_LINES_INDIVIDUALLY.getValue()); final boolean includeAllViolations = context.getProperty(INCLUDE_ALL_VIOLATIONS).asBoolean(); - final AtomicReference valid = new AtomicReference<>(true); + boolean valid = true; + int okCount = 0; + int totalCount = 0; + FlowFile invalidFF = null; + FlowFile validFF = null; + String validationError = null; final AtomicReference isFirstLineValid = new AtomicReference<>(true); final AtomicReference isFirstLineInvalid = new AtomicReference<>(true); - final AtomicReference okCount = new AtomicReference<>(0); - final AtomicReference totalCount = new AtomicReference<>(0); - final AtomicReference invalidFF = new AtomicReference<>(null); - final AtomicReference validFF = new AtomicReference<>(null); - final AtomicReference validationError = new AtomicReference<>(null); if (!isWholeFFValidation) { - invalidFF.set(session.create(flowFile)); - validFF.set(session.create(flowFile)); + invalidFF = session.create(flowFile); + validFF = session.create(flowFile); } - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - try (final NifiCsvListReader listReader = new NifiCsvListReader(new InputStreamReader(in), csvPref)) { + InputStream stream; + if (context.getProperty(CSV_SOURCE_ATTRIBUTE).isSet() && isWholeFFValidation) { + String csvAttribute = flowFile.getAttribute(context.getProperty(CSV_SOURCE_ATTRIBUTE).evaluateAttributeExpressions().getValue()); + stream = new ByteArrayInputStream(Objects.requireNonNullElse(csvAttribute, "").getBytes(StandardCharsets.UTF_8)); + } else { + stream = session.read(flowFile); + } - // handling of header - if (header) { + stream: try (final NifiCsvListReader listReader = new NifiCsvListReader(new InputStreamReader(stream), csvPref)) { - // read header - listReader.read(); + // handling of header + if (header) { - if (!isWholeFFValidation) { - invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() { - @Override - public void process(OutputStream out) throws IOException { - out.write(print(listReader.getUntokenizedRow(), csvPref, true)); - } - })); - validFF.set(session.append(validFF.get(), new OutputStreamCallback() { - @Override - public void process(OutputStream out) throws IOException { - out.write(print(listReader.getUntokenizedRow(), csvPref, true)); - } - })); - isFirstLineValid.set(false); - isFirstLineInvalid.set(false); - } + // read header + List headers = listReader.read(); + + if (schema == null) { + if (headers != null && !headers.isEmpty()) { + String newSchema = "Optional(StrNotNullOrEmpty()),".repeat(headers.size()); + schema = newSchema.substring(0, newSchema.length() - 1); + cellProcs = this.parseSchema(schema); + } else { + validationError = "No schema or CSV header could be identified."; + valid = false; + break stream; } + } - boolean stop = false; - - while (!stop) { - try { - - // read next row and check if no more row - stop = listReader.read(includeAllViolations && valid.get(), cellProcs) == null; - - if (!isWholeFFValidation && !stop) { - validFF.set(session.append(validFF.get(), new OutputStreamCallback() { - @Override - public void process(OutputStream out) throws IOException { - out.write(print(listReader.getUntokenizedRow(), csvPref, isFirstLineValid.get())); - } - })); - okCount.set(okCount.get() + 1); - - if (isFirstLineValid.get()) { - isFirstLineValid.set(false); - } - } - - } catch (final SuperCsvException e) { - valid.set(false); - if (isWholeFFValidation) { - validationError.set(e.getLocalizedMessage()); - logger.debug("Failed to validate {} against schema due to {}; routing to 'invalid'", flowFile, e); - break; - } else { - // we append the invalid line to the flow file that will be routed to invalid relationship - invalidFF.set(session.append(invalidFF.get(), new OutputStreamCallback() { - @Override - public void process(OutputStream out) throws IOException { - out.write(print(listReader.getUntokenizedRow(), csvPref, isFirstLineInvalid.get())); - } - })); - - if (isFirstLineInvalid.get()) { - isFirstLineInvalid.set(false); - } - - if (validationError.get() == null) { - validationError.set(e.getLocalizedMessage()); - } - } - } finally { - if (!isWholeFFValidation) { - totalCount.set(totalCount.get() + 1); - } - } - } - - } catch (final IOException e) { - valid.set(false); - logger.error("Failed to validate {} against schema due to {}", flowFile, e); + if (!isWholeFFValidation) { + invalidFF = session.append(invalidFF, out -> out.write(print(listReader.getUntokenizedRow(), csvPref, true))); + validFF = session.append(validFF, out -> out.write(print(listReader.getUntokenizedRow(), csvPref, true))); + isFirstLineValid.set(false); + isFirstLineInvalid.set(false); } } - }); + + boolean stop = false; + + while (!stop) { + try { + + // read next row and check if no more row + stop = listReader.read(includeAllViolations && valid, cellProcs) == null; + + if (!isWholeFFValidation && !stop) { + validFF = session.append(validFF, out -> out.write(print(listReader.getUntokenizedRow(), csvPref, isFirstLineValid.get()))); + okCount++; + + if (isFirstLineValid.get()) { + isFirstLineValid.set(false); + } + } + } catch (final SuperCsvException e) { + valid = false; + if (isWholeFFValidation) { + validationError = e.getLocalizedMessage(); + logger.debug("Failed to validate {} against schema due to {}; routing to 'invalid'", flowFile, e); + break; + } else { + // we append the invalid line to the flow file that will be routed to invalid relationship + invalidFF = session.append(invalidFF, out -> out.write(print(listReader.getUntokenizedRow(), csvPref, isFirstLineInvalid.get()))); + + if (isFirstLineInvalid.get()) { + isFirstLineInvalid.set(false); + } + + if (validationError == null) { + validationError = e.getLocalizedMessage(); + } + } + } finally { + if (!isWholeFFValidation) { + totalCount++; + } + } + } + + } catch (final IOException e) { + valid = false; + logger.error("Failed to validate {} against schema due to {}", flowFile, e); + } if (isWholeFFValidation) { - if (valid.get()) { + if (valid) { logger.debug("Successfully validated {} against schema; routing to 'valid'", flowFile); session.getProvenanceReporter().route(flowFile, REL_VALID); session.transfer(flowFile, REL_VALID); } else { session.getProvenanceReporter().route(flowFile, REL_INVALID); - session.putAttribute(flowFile, "validation.error.message", validationError.get()); + session.putAttribute(flowFile, "validation.error.message", validationError); session.transfer(flowFile, REL_INVALID); } } else { - if (valid.get()) { - logger.debug("Successfully validated {} against schema; routing to 'valid'", validFF.get()); - session.getProvenanceReporter().route(validFF.get(), REL_VALID, "All " + totalCount.get() + " line(s) are valid"); - session.putAttribute(validFF.get(), "count.valid.lines", Integer.toString(totalCount.get())); - session.putAttribute(validFF.get(), "count.total.lines", Integer.toString(totalCount.get())); - session.transfer(validFF.get(), REL_VALID); - session.remove(invalidFF.get()); + if (valid) { + logger.debug("Successfully validated {} against schema; routing to 'valid'", validFF); + session.getProvenanceReporter().route(validFF, REL_VALID, "All " + totalCount + " line(s) are valid"); + session.putAttribute(validFF, "count.valid.lines", Integer.toString(totalCount)); + session.putAttribute(validFF, "count.total.lines", Integer.toString(totalCount)); + session.transfer(validFF, REL_VALID); + session.remove(invalidFF); session.remove(flowFile); - } else if (okCount.get() != 0) { + } else if (okCount != 0) { // because of the finally within the 'while' loop - totalCount.set(totalCount.get() - 1); + totalCount--; - logger.debug("Successfully validated {}/{} line(s) in {} against schema; routing valid lines to 'valid' and invalid lines to 'invalid'", okCount.get(), totalCount.get(), flowFile); - session.getProvenanceReporter().route(validFF.get(), REL_VALID, okCount.get() + " valid line(s)"); - session.putAttribute(validFF.get(), "count.total.lines", Integer.toString(totalCount.get())); - session.putAttribute(validFF.get(), "count.valid.lines", Integer.toString(okCount.get())); - session.transfer(validFF.get(), REL_VALID); - session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, (totalCount.get() - okCount.get()) + " invalid line(s)"); - session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString((totalCount.get() - okCount.get()))); - session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get())); - session.putAttribute(invalidFF.get(), "validation.error.message", validationError.get()); - session.transfer(invalidFF.get(), REL_INVALID); + logger.debug("Successfully validated {}/{} line(s) in {} against schema; routing valid lines to 'valid' and invalid lines to 'invalid'", + okCount, totalCount, flowFile); + session.getProvenanceReporter().route(validFF, REL_VALID, okCount + " valid line(s)"); + session.putAttribute(validFF, "count.total.lines", Integer.toString(totalCount)); + session.putAttribute(validFF, "count.valid.lines", Integer.toString(okCount)); + session.transfer(validFF, REL_VALID); + session.getProvenanceReporter().route(invalidFF, REL_INVALID, (totalCount - okCount) + " invalid line(s)"); + session.putAttribute(invalidFF, "count.invalid.lines", Integer.toString((totalCount - okCount))); + session.putAttribute(invalidFF, "count.total.lines", Integer.toString(totalCount)); + session.putAttribute(invalidFF, "validation.error.message", validationError); + session.transfer(invalidFF, REL_INVALID); session.remove(flowFile); } else { - logger.debug("All lines in {} are invalid; routing to 'invalid'", invalidFF.get()); - session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, "All " + totalCount.get() + " line(s) are invalid"); - session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString(totalCount.get())); - session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get())); - session.putAttribute(invalidFF.get(), "validation.error.message", validationError.get()); - session.transfer(invalidFF.get(), REL_INVALID); - session.remove(validFF.get()); + logger.debug("All lines in {} are invalid; routing to 'invalid'", invalidFF); + session.getProvenanceReporter().route(invalidFF, REL_INVALID, "All " + totalCount + " line(s) are invalid"); + session.putAttribute(invalidFF, "count.invalid.lines", Integer.toString(totalCount)); + session.putAttribute(invalidFF, "count.total.lines", Integer.toString(totalCount)); + session.putAttribute(invalidFF, "validation.error.message", validationError); + session.transfer(invalidFF, REL_INVALID); + session.remove(validFF); session.remove(flowFile); } } diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java index 2423db913d..c518bdfd29 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java @@ -16,10 +16,14 @@ */ package org.apache.nifi.processors.standard; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.Test; +import java.util.HashMap; +import java.util.Map; + public class TestValidateCsv { @Test @@ -164,6 +168,106 @@ public class TestValidateCsv { runner.assertTransferCount(ValidateCsv.REL_INVALID, 1); } + @Test + public void testNoSchema() { + final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv()); + runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ","); + runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n"); + runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\""); + runner.setProperty(ValidateCsv.HEADER, "true"); + + runner.enqueue("bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647"); + runner.run(); + runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1); + + runner.clearTransferState(); + runner.enqueue(new byte[0]); + runner.run(); + runner.assertAllFlowFilesTransferred(ValidateCsv.REL_INVALID, 1); + } + + @Test + public void testValidateOnAttribute() { + final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv()); + runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ","); + runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n"); + runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\""); + runner.setProperty(ValidateCsv.HEADER, "true"); + runner.setProperty(ValidateCsv.CSV_SOURCE_ATTRIBUTE, "CSV_ATTRIBUTE"); + runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_WHOLE_FLOWFILE.getValue()); + final Map attributeMap = new HashMap<>(); + attributeMap.put("CSV_ATTRIBUTE", "bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647"); + + runner.enqueue("FlowFile Random Data", attributeMap); + runner.run(); + runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1); + runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).getFirst().assertContentEquals("FlowFile Random Data"); + } + + @Test + public void testValidateOnAttributeDoesNotExist() { + final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv()); + runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ","); + runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n"); + runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\""); + runner.setProperty(ValidateCsv.HEADER, "true"); + runner.setProperty(ValidateCsv.CSV_SOURCE_ATTRIBUTE, "CSV_ATTRIBUTE"); + runner.setProperty(ValidateCsv.SCHEMA, "ParseInt(),ParseInt(),ParseInt()"); + runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_WHOLE_FLOWFILE.getValue()); + final Map attributeMap = new HashMap<>(); + attributeMap.put("CSV_ATTRIBUTE_BAD", "bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647"); + + runner.enqueue("FlowFile Random Data", attributeMap); + runner.run(); + runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1); + runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).getFirst().assertContentEquals("FlowFile Random Data"); + + runner.clearTransferState(); + attributeMap.clear(); + attributeMap.put("CSV_ATTRIBUTE", ""); + runner.enqueue("FlowFile Random Data", attributeMap); + runner.run(); + runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1); + runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).getFirst().assertContentEquals("FlowFile Random Data"); + } + + @Test + public void testValidateOnAttributeDoesNotExistNoSchema() { + final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv()); + runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ","); + runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n"); + runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\""); + runner.setProperty(ValidateCsv.HEADER, "true"); + runner.setProperty(ValidateCsv.CSV_SOURCE_ATTRIBUTE, "CSV_ATTRIBUTE"); + runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_WHOLE_FLOWFILE.getValue()); + final Map attributeMap = new HashMap<>(); + attributeMap.put("CSV_ATTRIBUTE_BAD", "bigdecimal,bool,char,integer,long\r\n10.0001,true,c,1,92147483647"); + + runner.enqueue("FlowFile Random Data", attributeMap); + runner.run(); + runner.assertAllFlowFilesTransferred(ValidateCsv.REL_INVALID, 1); + MockFlowFile flowfile = runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).getFirst(); + flowfile.assertAttributeEquals("validation.error.message", + "No schema or CSV header could be identified."); + flowfile.assertContentEquals("FlowFile Random Data"); + } + + @Test + public void testValidateEmptyFile() { + final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv()); + runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ","); + runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n"); + runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\""); + runner.setProperty(ValidateCsv.HEADER, "true"); + runner.setProperty(ValidateCsv.SCHEMA, "ParseInt(),ParseInt(),ParseInt()"); + runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_WHOLE_FLOWFILE.getValue()); + final Map attributeMap = new HashMap<>(); + + runner.enqueue(new byte[0], attributeMap); + runner.run(); + runner.assertAllFlowFilesTransferred(ValidateCsv.REL_VALID, 1); + } + @Test public void testEqualsNotNullStrNotNullOrEmpty() { final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv());