mirror of https://github.com/apache/nifi.git
NIFI-4577 - ValidateCsv - add attributes to indicate number of valid/invalid lines
Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #2268
This commit is contained in:
parent
af3a578711
commit
9e9c129c21
|
@ -35,6 +35,8 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
|
|||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
|
@ -86,6 +88,11 @@ import org.supercsv.prefs.CsvPreference;
|
|||
@Tags({"csv", "schema", "validation"})
|
||||
@CapabilityDescription("Validates the contents of FlowFiles against a user-specified CSV schema. " +
|
||||
"Take a look at the additional documentation of this processor for some schema examples.")
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute="count.valid.lines", description="If line by line validation, number of valid lines extracted from the source data"),
|
||||
@WritesAttribute(attribute="count.invalid.lines", description="If line by line validation, number of invalid lines extracted from the source data"),
|
||||
@WritesAttribute(attribute="count.total.lines", description="If line by line validation, total number of lines in the source data")
|
||||
})
|
||||
public class ValidateCsv extends AbstractProcessor {
|
||||
|
||||
private final static List<String> allowedOperators = Arrays.asList("ParseBigDecimal", "ParseBool", "ParseChar", "ParseDate",
|
||||
|
@ -542,6 +549,8 @@ public class ValidateCsv extends AbstractProcessor {
|
|||
if (valid.get()) {
|
||||
logger.debug("Successfully validated {} against schema; routing to 'valid'", new Object[]{validFF.get()});
|
||||
session.getProvenanceReporter().route(validFF.get(), REL_VALID, "All " + totalCount.get() + " line(s) are valid");
|
||||
session.putAttribute(validFF.get(), "count.valid.lines", Integer.toString(totalCount.get()));
|
||||
session.putAttribute(validFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
|
||||
session.transfer(validFF.get(), REL_VALID);
|
||||
session.remove(invalidFF.get());
|
||||
session.remove(flowFile);
|
||||
|
@ -552,13 +561,19 @@ public class ValidateCsv extends AbstractProcessor {
|
|||
logger.debug("Successfully validated {}/{} line(s) in {} against schema; routing valid lines to 'valid' and invalid lines to 'invalid'",
|
||||
new Object[]{okCount.get(), totalCount.get(), flowFile});
|
||||
session.getProvenanceReporter().route(validFF.get(), REL_VALID, okCount.get() + " valid line(s)");
|
||||
session.putAttribute(validFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
|
||||
session.putAttribute(validFF.get(), "count.valid.lines", Integer.toString(okCount.get()));
|
||||
session.transfer(validFF.get(), REL_VALID);
|
||||
session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, (totalCount.get() - okCount.get()) + " invalid line(s)");
|
||||
session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString((totalCount.get() - okCount.get())));
|
||||
session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
|
||||
session.transfer(invalidFF.get(), REL_INVALID);
|
||||
session.remove(flowFile);
|
||||
} else {
|
||||
logger.debug("All lines in {} are invalid; routing to 'invalid'", new Object[]{invalidFF.get()});
|
||||
session.getProvenanceReporter().route(invalidFF.get(), REL_INVALID, "All " + totalCount.get() + " line(s) are invalid");
|
||||
session.putAttribute(invalidFF.get(), "count.invalid.lines", Integer.toString(totalCount.get()));
|
||||
session.putAttribute(invalidFF.get(), "count.total.lines", Integer.toString(totalCount.get()));
|
||||
session.transfer(invalidFF.get(), REL_INVALID);
|
||||
session.remove(validFF.get());
|
||||
session.remove(flowFile);
|
||||
|
|
|
@ -90,14 +90,18 @@ public class TestValidateCsv {
|
|||
|
||||
runner.setProperty(ValidateCsv.SCHEMA, "Unique()");
|
||||
|
||||
runner.enqueue("John\r\nBob\r\nBob\r\nJohn");
|
||||
runner.enqueue("John\r\nBob\r\nBob\r\nJohn\r\nTom");
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(ValidateCsv.REL_VALID, 1);
|
||||
runner.assertTransferCount(ValidateCsv.REL_INVALID, 1);
|
||||
|
||||
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("John\r\nBob");
|
||||
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertContentEquals("John\r\nBob\r\nTom");
|
||||
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertAttributeEquals("count.total.lines", "5");
|
||||
runner.getFlowFilesForRelationship(ValidateCsv.REL_VALID).get(0).assertAttributeEquals("count.valid.lines", "3");
|
||||
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertContentEquals("Bob\r\nJohn");
|
||||
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertAttributeEquals("count.invalid.lines", "2");
|
||||
runner.getFlowFilesForRelationship(ValidateCsv.REL_INVALID).get(0).assertAttributeEquals("count.total.lines", "5");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue