mirror of https://github.com/apache/nifi.git
NIFI-4883: Updated ValidateRecord to allow an optional Record Writer to be configured for invalid records that differs from the Record Writer used for writing valid records
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #2600.
This commit is contained in:
parent
bbe79d2260
commit
94d444abb0
|
@ -112,6 +112,16 @@ public class ValidateRecord extends AbstractProcessor {
|
||||||
.identifiesControllerService(RecordSetWriterFactory.class)
|
.identifiesControllerService(RecordSetWriterFactory.class)
|
||||||
.required(true)
|
.required(true)
|
||||||
.build();
|
.build();
|
||||||
|
static final PropertyDescriptor INVALID_RECORD_WRITER = new PropertyDescriptor.Builder()
|
||||||
|
.name("invalid-record-writer")
|
||||||
|
.displayName("Record Writer for Invalid Records")
|
||||||
|
.description("If specified, this Controller Service will be used to write out any records that are invalid. "
|
||||||
|
+ "If not specified, the writer specified by the \"Record Writer\" property will be used. This is useful, for example, when the configured "
|
||||||
|
+ "Record Writer cannot write data that does not adhere to its schema (as is the case with Avro) or when it is desirable to keep invalid records "
|
||||||
|
+ "in their original format while converting valid records to another format.")
|
||||||
|
.identifiesControllerService(RecordSetWriterFactory.class)
|
||||||
|
.required(false)
|
||||||
|
.build();
|
||||||
static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder()
|
||||||
.name("schema-access-strategy")
|
.name("schema-access-strategy")
|
||||||
.displayName("Schema Access Strategy")
|
.displayName("Schema Access Strategy")
|
||||||
|
@ -186,6 +196,7 @@ public class ValidateRecord extends AbstractProcessor {
|
||||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||||
properties.add(RECORD_READER);
|
properties.add(RECORD_READER);
|
||||||
properties.add(RECORD_WRITER);
|
properties.add(RECORD_WRITER);
|
||||||
|
properties.add(INVALID_RECORD_WRITER);
|
||||||
properties.add(SCHEMA_ACCESS_STRATEGY);
|
properties.add(SCHEMA_ACCESS_STRATEGY);
|
||||||
properties.add(SCHEMA_REGISTRY);
|
properties.add(SCHEMA_REGISTRY);
|
||||||
properties.add(SCHEMA_NAME);
|
properties.add(SCHEMA_NAME);
|
||||||
|
@ -237,7 +248,10 @@ public class ValidateRecord extends AbstractProcessor {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
|
final RecordSetWriterFactory validRecordWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
|
||||||
|
final RecordSetWriterFactory invalidRecordWriterFactory = context.getProperty(INVALID_RECORD_WRITER).isSet()
|
||||||
|
? context.getProperty(INVALID_RECORD_WRITER).asControllerService(RecordSetWriterFactory.class)
|
||||||
|
: validRecordWriterFactory;
|
||||||
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
|
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
|
||||||
|
|
||||||
final boolean allowExtraFields = context.getProperty(ALLOW_EXTRA_FIELDS).asBoolean();
|
final boolean allowExtraFields = context.getProperty(ALLOW_EXTRA_FIELDS).asBoolean();
|
||||||
|
@ -277,7 +291,7 @@ public class ValidateRecord extends AbstractProcessor {
|
||||||
validFlowFile = session.create(flowFile);
|
validFlowFile = session.create(flowFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
validWriter = writer = createIfNecessary(validWriter, writerFactory, session, validFlowFile, record.getSchema());
|
validWriter = writer = createIfNecessary(validWriter, validRecordWriterFactory, session, validFlowFile, record.getSchema());
|
||||||
} else {
|
} else {
|
||||||
invalidCount++;
|
invalidCount++;
|
||||||
logValidationErrors(flowFile, recordCount, result);
|
logValidationErrors(flowFile, recordCount, result);
|
||||||
|
@ -286,7 +300,7 @@ public class ValidateRecord extends AbstractProcessor {
|
||||||
invalidFlowFile = session.create(flowFile);
|
invalidFlowFile = session.create(flowFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
invalidWriter = writer = createIfNecessary(invalidWriter, writerFactory, session, invalidFlowFile, record.getSchema());
|
invalidWriter = writer = createIfNecessary(invalidWriter, invalidRecordWriterFactory, session, invalidFlowFile, record.getSchema());
|
||||||
|
|
||||||
// Add all of the validation errors to our Set<ValidationError> but only keep up to MAX_VALIDATION_ERRORS because if
|
// Add all of the validation errors to our Set<ValidationError> but only keep up to MAX_VALIDATION_ERRORS because if
|
||||||
// we keep too many then we both use up a lot of heap and risk outputting so much information in the Provenance Event
|
// we keep too many then we both use up a lot of heap and risk outputting so much information in the Provenance Event
|
||||||
|
@ -380,7 +394,7 @@ public class ValidateRecord extends AbstractProcessor {
|
||||||
session.adjustCounter("Records Validated", recordCount, false);
|
session.adjustCounter("Records Validated", recordCount, false);
|
||||||
session.adjustCounter("Records Found Valid", validCount, false);
|
session.adjustCounter("Records Found Valid", validCount, false);
|
||||||
session.adjustCounter("Records Found Invalid", invalidCount, false);
|
session.adjustCounter("Records Found Invalid", invalidCount, false);
|
||||||
} catch (final IOException | MalformedRecordException | SchemaNotFoundException e) {
|
} catch (final Exception e) {
|
||||||
getLogger().error("Failed to process {}; will route to failure", new Object[] {flowFile, e});
|
getLogger().error("Failed to process {}; will route to failure", new Object[] {flowFile, e});
|
||||||
session.transfer(flowFile, REL_FAILURE);
|
session.transfer(flowFile, REL_FAILURE);
|
||||||
if (validFlowFile != null) {
|
if (validFlowFile != null) {
|
||||||
|
|
|
@ -17,10 +17,18 @@
|
||||||
|
|
||||||
package org.apache.nifi.processors.standard;
|
package org.apache.nifi.processors.standard;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.UnsupportedEncodingException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
|
||||||
import org.apache.nifi.csv.CSVReader;
|
import org.apache.nifi.csv.CSVReader;
|
||||||
import org.apache.nifi.csv.CSVRecordSetWriter;
|
import org.apache.nifi.csv.CSVRecordSetWriter;
|
||||||
import org.apache.nifi.csv.CSVUtils;
|
import org.apache.nifi.csv.CSVUtils;
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
|
import org.apache.nifi.schema.access.SchemaAccessUtils;
|
||||||
|
import org.apache.nifi.serialization.record.MockRecordWriter;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -59,4 +67,74 @@ public class TestValidateRecord {
|
||||||
runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0).assertContentEquals(content);
|
runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0).assertContentEquals(content);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWriteFailureRoutesToFaliure() throws InitializationException {
|
||||||
|
final CSVReader csvReader = new CSVReader();
|
||||||
|
runner.addControllerService("reader", csvReader);
|
||||||
|
runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "true");
|
||||||
|
runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL.getValue());
|
||||||
|
runner.setProperty(csvReader, CSVUtils.TRAILING_DELIMITER, "false");
|
||||||
|
runner.enableControllerService(csvReader);
|
||||||
|
|
||||||
|
MockRecordWriter writer = new MockRecordWriter("header", false, 1);
|
||||||
|
runner.addControllerService("writer", writer);
|
||||||
|
runner.enableControllerService(writer);
|
||||||
|
|
||||||
|
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
|
||||||
|
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
|
||||||
|
|
||||||
|
final String content = "fieldA,fieldB,fieldC,fieldD,fieldE,fieldF\nvalueA,valueB,valueC,valueD,valueE,valueF\nvalueA,valueB,valueC,valueD,valueE,valueF\n";
|
||||||
|
runner.enqueue(content);
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(ValidateRecord.REL_FAILURE, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppropriateServiceUsedForInvalidRecords() throws InitializationException, UnsupportedEncodingException, IOException {
|
||||||
|
final String schema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc")), "UTF-8");
|
||||||
|
|
||||||
|
final CSVReader csvReader = new CSVReader();
|
||||||
|
runner.addControllerService("reader", csvReader);
|
||||||
|
runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
|
||||||
|
runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_TEXT, schema);
|
||||||
|
runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "false");
|
||||||
|
runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL.getValue());
|
||||||
|
runner.setProperty(csvReader, CSVUtils.TRAILING_DELIMITER, "false");
|
||||||
|
runner.enableControllerService(csvReader);
|
||||||
|
|
||||||
|
final MockRecordWriter validWriter = new MockRecordWriter("valid", false);
|
||||||
|
runner.addControllerService("writer", validWriter);
|
||||||
|
runner.enableControllerService(validWriter);
|
||||||
|
|
||||||
|
final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true);
|
||||||
|
runner.addControllerService("invalid-writer", invalidWriter);
|
||||||
|
runner.enableControllerService(invalidWriter);
|
||||||
|
|
||||||
|
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
|
||||||
|
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
|
||||||
|
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
|
||||||
|
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
|
||||||
|
|
||||||
|
final String content = "1, John Doe\n"
|
||||||
|
+ "2, Jane Doe\n"
|
||||||
|
+ "Three, Jack Doe\n";
|
||||||
|
|
||||||
|
runner.enqueue(content);
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
|
||||||
|
runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
|
||||||
|
runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
|
||||||
|
|
||||||
|
final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
|
||||||
|
validFlowFile.assertAttributeEquals("record.count", "2");
|
||||||
|
validFlowFile.assertContentEquals("valid\n"
|
||||||
|
+ "1,John Doe\n"
|
||||||
|
+ "2,Jane Doe\n");
|
||||||
|
|
||||||
|
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0);
|
||||||
|
invalidFlowFile.assertAttributeEquals("record.count", "1");
|
||||||
|
invalidFlowFile.assertContentEquals("invalid\n\"Three\",\"Jack Doe\"\n");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue