diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java index 06d7db7ad6..5cae54be46 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java @@ -112,6 +112,16 @@ public class ValidateRecord extends AbstractProcessor { .identifiesControllerService(RecordSetWriterFactory.class) .required(true) .build(); + static final PropertyDescriptor INVALID_RECORD_WRITER = new PropertyDescriptor.Builder() + .name("invalid-record-writer") + .displayName("Record Writer for Invalid Records") + .description("If specified, this Controller Service will be used to write out any records that are invalid. " + + "If not specified, the writer specified by the \"Record Writer\" property will be used. This is useful, for example, when the configured " + + "Record Writer cannot write data that does not adhere to its schema (as is the case with Avro) or when it is desirable to keep invalid records " + + "in their original format while converting valid records to another format.") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(false) + .build(); static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder() .name("schema-access-strategy") .displayName("Schema Access Strategy") @@ -186,6 +196,7 @@ public class ValidateRecord extends AbstractProcessor { final List properties = new ArrayList<>(); properties.add(RECORD_READER); properties.add(RECORD_WRITER); + properties.add(INVALID_RECORD_WRITER); properties.add(SCHEMA_ACCESS_STRATEGY); properties.add(SCHEMA_REGISTRY); properties.add(SCHEMA_NAME); @@ -237,7 +248,10 @@ public class ValidateRecord extends AbstractProcessor { return; } - final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final RecordSetWriterFactory validRecordWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final RecordSetWriterFactory invalidRecordWriterFactory = context.getProperty(INVALID_RECORD_WRITER).isSet() + ? context.getProperty(INVALID_RECORD_WRITER).asControllerService(RecordSetWriterFactory.class) + : validRecordWriterFactory; final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final boolean allowExtraFields = context.getProperty(ALLOW_EXTRA_FIELDS).asBoolean(); @@ -277,7 +291,7 @@ public class ValidateRecord extends AbstractProcessor { validFlowFile = session.create(flowFile); } - validWriter = writer = createIfNecessary(validWriter, writerFactory, session, validFlowFile, record.getSchema()); + validWriter = writer = createIfNecessary(validWriter, validRecordWriterFactory, session, validFlowFile, record.getSchema()); } else { invalidCount++; logValidationErrors(flowFile, recordCount, result); @@ -286,7 +300,7 @@ public class ValidateRecord extends AbstractProcessor { invalidFlowFile = session.create(flowFile); } - invalidWriter = writer = createIfNecessary(invalidWriter, writerFactory, session, invalidFlowFile, record.getSchema()); + invalidWriter = writer = createIfNecessary(invalidWriter, invalidRecordWriterFactory, session, invalidFlowFile, record.getSchema()); // Add all of the validation errors to our Set but only keep up to MAX_VALIDATION_ERRORS because if // we keep too many then we both use up a lot of heap and risk outputting so much information in the Provenance Event @@ -380,7 +394,7 @@ public class ValidateRecord extends AbstractProcessor { session.adjustCounter("Records Validated", recordCount, false); session.adjustCounter("Records Found Valid", validCount, false); session.adjustCounter("Records Found Invalid", invalidCount, false); - } catch (final IOException | MalformedRecordException | SchemaNotFoundException e) { + } catch (final Exception e) { getLogger().error("Failed to process {}; will route to failure", new Object[] {flowFile, e}); session.transfer(flowFile, REL_FAILURE); if (validFlowFile != null) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java index 54362a0e80..728875b894 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java @@ -17,10 +17,18 @@ package org.apache.nifi.processors.standard; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.file.Files; +import java.nio.file.Paths; + import org.apache.nifi.csv.CSVReader; import org.apache.nifi.csv.CSVRecordSetWriter; import org.apache.nifi.csv.CSVUtils; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Before; @@ -59,4 +67,74 @@ public class TestValidateRecord { runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0).assertContentEquals(content); } + + @Test + public void testWriteFailureRoutesToFaliure() throws InitializationException { + final CSVReader csvReader = new CSVReader(); + runner.addControllerService("reader", csvReader); + runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "true"); + runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL.getValue()); + runner.setProperty(csvReader, CSVUtils.TRAILING_DELIMITER, "false"); + runner.enableControllerService(csvReader); + + MockRecordWriter writer = new MockRecordWriter("header", false, 1); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(ValidateRecord.RECORD_READER, "reader"); + runner.setProperty(ValidateRecord.RECORD_WRITER, "writer"); + + final String content = "fieldA,fieldB,fieldC,fieldD,fieldE,fieldF\nvalueA,valueB,valueC,valueD,valueE,valueF\nvalueA,valueB,valueC,valueD,valueE,valueF\n"; + runner.enqueue(content); + runner.run(); + runner.assertAllFlowFilesTransferred(ValidateRecord.REL_FAILURE, 1); + } + + @Test + public void testAppropriateServiceUsedForInvalidRecords() throws InitializationException, UnsupportedEncodingException, IOException { + final String schema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc")), "UTF-8"); + + final CSVReader csvReader = new CSVReader(); + runner.addControllerService("reader", csvReader); + runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_TEXT, schema); + runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "false"); + runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL.getValue()); + runner.setProperty(csvReader, CSVUtils.TRAILING_DELIMITER, "false"); + runner.enableControllerService(csvReader); + + final MockRecordWriter validWriter = new MockRecordWriter("valid", false); + runner.addControllerService("writer", validWriter); + runner.enableControllerService(validWriter); + + final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true); + runner.addControllerService("invalid-writer", invalidWriter); + runner.enableControllerService(invalidWriter); + + runner.setProperty(ValidateRecord.RECORD_READER, "reader"); + runner.setProperty(ValidateRecord.RECORD_WRITER, "writer"); + runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer"); + runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false"); + + final String content = "1, John Doe\n" + + "2, Jane Doe\n" + + "Three, Jack Doe\n"; + + runner.enqueue(content); + runner.run(); + + runner.assertTransferCount(ValidateRecord.REL_VALID, 1); + runner.assertTransferCount(ValidateRecord.REL_INVALID, 1); + runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0); + + final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0); + validFlowFile.assertAttributeEquals("record.count", "2"); + validFlowFile.assertContentEquals("valid\n" + + "1,John Doe\n" + + "2,Jane Doe\n"); + + final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0); + invalidFlowFile.assertAttributeEquals("record.count", "1"); + invalidFlowFile.assertContentEquals("invalid\n\"Three\",\"Jack Doe\"\n"); + } }