diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java index 130713cc85..4a751f0fe3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java @@ -23,7 +23,6 @@ import java.io.OutputStream; import java.io.Reader; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -39,11 +38,8 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; @@ -185,8 +181,6 @@ public class ValidateCsv extends AbstractProcessor { private List properties; private Set relationships; - private final AtomicReference processors = new AtomicReference(); - private final AtomicReference preference = new AtomicReference(); @Override protected void init(final ProcessorInitializationContext context) { @@ -215,32 +209,14 @@ public class ValidateCsv extends AbstractProcessor { return properties; } - @Override - protected Collection customValidate(ValidationContext validationContext) { - String schema = validationContext.getProperty(SCHEMA).evaluateAttributeExpressions().getValue(); - try { - this.parseSchema(schema); - } catch (Exception e) { - final List problems = new ArrayList<>(1); - problems.add(new ValidationResult.Builder().subject(SCHEMA.getName()) - .input(schema) - .valid(false) - .explanation("Error while parsing the schema: " + e.getMessage()) - .build()); - return problems; - } - return super.customValidate(validationContext); - } - - @OnScheduled - public void setPreference(final ProcessContext context) { + public CsvPreference getPreference(final ProcessContext context, final FlowFile flowFile) { // When going from the UI to Java, the characters are escaped so that what you // input is transferred over to Java as is. So when you type the characters "\" // and "n" into the UI the Java string will end up being those two characters // not the interpreted value "\n". - final String msgDemarcator = context.getProperty(END_OF_LINE_CHARACTER).evaluateAttributeExpressions().getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); - this.preference.set(new CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).evaluateAttributeExpressions().getValue().charAt(0), - context.getProperty(DELIMITER_CHARACTER).evaluateAttributeExpressions().getValue().charAt(0), msgDemarcator).build()); + final String msgDemarcator = context.getProperty(END_OF_LINE_CHARACTER).evaluateAttributeExpressions(flowFile).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); + return new CsvPreference.Builder(context.getProperty(QUOTE_CHARACTER).evaluateAttributeExpressions(flowFile).getValue().charAt(0), + context.getProperty(DELIMITER_CHARACTER).evaluateAttributeExpressions(flowFile).getValue().charAt(0), msgDemarcator).build(); } /** @@ -248,15 +224,15 @@ public class ValidateCsv extends AbstractProcessor { * to a list of cell processors used to validate the CSV data. * @param schema Schema to parse */ - private void parseSchema(String schema) { - List processorsList = new ArrayList(); + private CellProcessor[] parseSchema(String schema) { + List processorsList = new ArrayList<>(); String remaining = schema; while(remaining.length() > 0) { remaining = setProcessor(remaining, processorsList); } - this.processors.set(processorsList.toArray(new CellProcessor[processorsList.size()])); + return processorsList.toArray(new CellProcessor[processorsList.size()]); } private String setProcessor(String remaining, List processorsList) { @@ -435,10 +411,11 @@ public class ValidateCsv extends AbstractProcessor { return; } - final CsvPreference csvPref = this.preference.get(); + final CsvPreference csvPref = getPreference(context, flowFile); final boolean header = context.getProperty(HEADER).asBoolean(); final ComponentLog logger = getLogger(); - final CellProcessor[] cellProcs = this.processors.get(); + final String schema = context.getProperty(SCHEMA).evaluateAttributeExpressions(flowFile).getValue(); + final CellProcessor[] cellProcs = this.parseSchema(schema); final boolean isWholeFFValidation = context.getProperty(VALIDATION_STRATEGY).getValue().equals(VALIDATE_WHOLE_FLOWFILE.getValue()); final AtomicReference valid = new AtomicReference(true); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java index 0e824759ab..21d14064c6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java @@ -313,4 +313,22 @@ public class TestValidateCsv { runner.assertValid(); } + @Test + public void testMultipleRuns() { + final TestRunner runner = TestRunners.newTestRunner(new ValidateCsv()); + runner.setProperty(ValidateCsv.DELIMITER_CHARACTER, ","); + runner.setProperty(ValidateCsv.END_OF_LINE_CHARACTER, "\r\n"); + runner.setProperty(ValidateCsv.QUOTE_CHARACTER, "\""); + runner.setProperty(ValidateCsv.HEADER, "false"); + runner.setProperty(ValidateCsv.VALIDATION_STRATEGY, ValidateCsv.VALIDATE_LINES_INDIVIDUALLY); + + runner.setProperty(ValidateCsv.SCHEMA, "Unique()"); + + runner.enqueue("John\r\nBob\r\nTom"); + runner.enqueue("John\r\nBob\r\nTom"); + runner.run(2); + + runner.assertTransferCount(ValidateCsv.REL_VALID, 2); + runner.assertTransferCount(ValidateCsv.REL_INVALID, 0); + } }