From 9e9c129c21061f5c66ba4246774738de7184406b Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Fri, 10 Nov 2017 22:18:44 +0100 Subject: [PATCH] NIFI-4577 - ValidateCsv - add attributes to indicate number of valid/invalid lines Signed-off-by: Matthew Burgess This closes #2268 --- .../nifi/processors/standard/ValidateCsv.java | 15 +++++++++++++++ .../nifi/processors/standard/TestValidateCsv.java | 8 ++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java index 43d3ef990d..6bb4205391 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateCsv.java @@ -35,6 +35,8 @@ import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -86,6 +88,11 @@ import org.supercsv.prefs.CsvPreference; @Tags({"csv", "schema", "validation"}) @CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema. " + "Take a look at the additional documentation of this processor for some schema examples.") +@WritesAttributes({ + @WritesAttribute(attribute="count.valid.lines", description="If line by line validation, number of valid lines extracted from the source data"), + @WritesAttribute(attribute="count.invalid.lines", description="If line by line validation, number of invalid lines extracted from the source data"), + @WritesAttribute(attribute="count.total.lines", description="If line by line validation, total number of lines in the source data") +}) public class ValidateCsv extends AbstractProcessor { private final static List allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate", @@ -542,6 +549,8 @@ public class ValidateCsv extends AbstractProcessor { if (valid.get()) { logger.debug("Successfully validated {} against schema; routing to 'valid'", new Object[]{validFF.get()}); session.getProvenanceReporter().route(validFF.get(), REL_VALID, "All " + totalCount.get() + " line(s) are valid"); + session.putAttribute(validFF.get(), "count.valid.lines", Integer.toString(totalCount.get())); + session.putAttribute(validFF.get(), "count.total.lines", Integer.toString(totalCount.get())); session.transfer(validFF.get(), REL_VALID); session.remove(invalidFF.get()); session.remove(flowFile); @@ -552,13 +561,19 @@ public class ValidateCsv extends AbstractProcessor { logger.debug("Successfully validated {}/{} line(s) in {} against schema; routing valid lines to 'valid' and invalid lines to 'invalid'", new Object[]{okCount.get(), totalCount.get(), flowFile}); session.getProvenanceReporter().route(validFF.get(), REL_VALID, okCount.get() + " valid line(s)"); + session.putAttribute(validFF.get(), "count.total.lines", Integer.toString(totalCount.get())); + session.putAttribute(validFF.get(), "count.valid.lines", Integer.toString(okCount.get())); session.transfer(validFF.get(), REL_VALID); session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, (totalCount.get() - okCount.get()) + " invalid line(s)"); + session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString((totalCount.get() - okCount.get()))); + session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get())); session.transfer(invalidFF.get(), REL_INVALID); session.remove(flowFile); } else { logger.debug("All lines in {} are invalid; routing to 'invalid'", new Object[]{invalidFF.get()}); session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, "All " + totalCount.get() + " line(s) are invalid"); + session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString(totalCount.get())); + session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get())); session.transfer(invalidFF.get(), REL_INVALID); session.remove(validFF.get()); session.remove(flowFile); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java index c2d4d3f014..5092ba82b1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateCsv.java @@ -90,14 +90,18 @@ public class TestValidateCsv { runner.setProperty(ValidateCsv.SCHEMA, "Unique()"); - runner.enqueue("John\r\nBob\r\nBob\r\nJohn"); + runner.enqueue("John\r\nBob\r\nBob\r\nJohn\r\nTom"); runner.run(); runner.assertTransferCount(ValidateCsv.REL_VALID, 1); runner.assertTransferCount(ValidateCsv.REL_INVALID, 1); - runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("John\r\nBob"); + runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("John\r\nBob\r\nTom"); + runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertAttributeEquals("count.total.lines", "5"); + runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertAttributeEquals("count.valid.lines", "3"); runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertContentEquals("Bob\r\nJohn"); + runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertAttributeEquals("count.invalid.lines", "2"); + runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertAttributeEquals("count.total.lines", "5"); } @Test