mirror of https://github.com/apache/nifi.git
NIFI-4579: Fix ValidateRecord type coercing
Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #2794
This commit is contained in:
parent
c51512f5e3
commit
0efddf47d5
|
@ -109,7 +109,9 @@ public class ValidateRecord extends AbstractProcessor {
|
|||
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
|
||||
.name("record-writer")
|
||||
.displayName("Record Writer")
|
||||
.description("Specifies the Controller Service to use for writing out the records")
|
||||
.description("Specifies the Controller Service to use for writing out the records. "
|
||||
+ "Regardless of the Controller Service schema access configuration, "
|
||||
+ "the schema that is used to validate record is used to write the valid results.")
|
||||
.identifiesControllerService(RecordSetWriterFactory.class)
|
||||
.required(true)
|
||||
.build();
|
||||
|
@ -117,7 +119,8 @@ public class ValidateRecord extends AbstractProcessor {
|
|||
.name("invalid-record-writer")
|
||||
.displayName("Record Writer for Invalid Records")
|
||||
.description("If specified, this Controller Service will be used to write out any records that are invalid. "
|
||||
+ "If not specified, the writer specified by the \"Record Writer\" property will be used. This is useful, for example, when the configured "
|
||||
+ "If not specified, the writer specified by the \"Record Writer\" property will be used with the schema used to read the input records. "
|
||||
+ "This is useful, for example, when the configured "
|
||||
+ "Record Writer cannot write data that does not adhere to its schema (as is the case with Avro) or when it is desirable to keep invalid records "
|
||||
+ "in their original format while converting valid records to another format.")
|
||||
.identifiesControllerService(RecordSetWriterFactory.class)
|
||||
|
@ -161,7 +164,7 @@ public class ValidateRecord extends AbstractProcessor {
|
|||
.displayName("Allow Extra Fields")
|
||||
.description("If the incoming data has fields that are not present in the schema, this property determines whether or not the Record is valid. "
|
||||
+ "If true, the Record is still valid. If false, the Record will be invalid due to the extra fields.")
|
||||
.expressionLanguageSupported(false)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("true")
|
||||
.required(true)
|
||||
|
@ -172,7 +175,7 @@ public class ValidateRecord extends AbstractProcessor {
|
|||
.description("If the incoming data has a Record where a field is not of the correct type, this property determine whether how to handle the Record. "
|
||||
+ "If true, the Record will still be considered invalid. If false, the Record will be considered valid and the field will be coerced into the "
|
||||
+ "correct type (if possible, according to the type coercion supported by the Record Writer).")
|
||||
.expressionLanguageSupported(false)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("true")
|
||||
.required(true)
|
||||
|
@ -292,7 +295,8 @@ public class ValidateRecord extends AbstractProcessor {
|
|||
validFlowFile = session.create(flowFile);
|
||||
}
|
||||
|
||||
validWriter = writer = createIfNecessary(validWriter, validRecordWriterFactory, session, validFlowFile, record.getSchema());
|
||||
validWriter = writer = createIfNecessary(validWriter, validRecordWriterFactory, session, validFlowFile, validationSchema);
|
||||
|
||||
} else {
|
||||
invalidCount++;
|
||||
logValidationErrors(flowFile, recordCount, result);
|
||||
|
@ -435,13 +439,13 @@ public class ValidateRecord extends AbstractProcessor {
|
|||
}
|
||||
|
||||
private RecordSetWriter createIfNecessary(final RecordSetWriter writer, final RecordSetWriterFactory factory, final ProcessSession session,
|
||||
final FlowFile flowFile, final RecordSchema inputSchema) throws SchemaNotFoundException, IOException {
|
||||
final FlowFile flowFile, final RecordSchema outputSchema) throws SchemaNotFoundException, IOException {
|
||||
if (writer != null) {
|
||||
return writer;
|
||||
}
|
||||
|
||||
final OutputStream out = session.write(flowFile);
|
||||
final RecordSetWriter created = factory.createWriter(getLogger(), inputSchema, out);
|
||||
final RecordSetWriter created = factory.createWriter(getLogger(), outputSchema, out);
|
||||
created.beginRecordSet();
|
||||
return created;
|
||||
}
|
||||
|
|
|
@ -17,23 +17,38 @@
|
|||
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.nifi.avro.AvroReader;
|
||||
import org.apache.nifi.avro.AvroRecordSetWriter;
|
||||
import org.apache.nifi.csv.CSVReader;
|
||||
import org.apache.nifi.csv.CSVRecordSetWriter;
|
||||
import org.apache.nifi.csv.CSVUtils;
|
||||
import org.apache.nifi.json.JsonRecordSetWriter;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.schema.access.SchemaAccessUtils;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.RecordReader;
|
||||
import org.apache.nifi.serialization.record.MockRecordWriter;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestValidateRecord {
|
||||
|
||||
private TestRunner runner;
|
||||
|
@ -137,4 +152,212 @@ public class TestValidateRecord {
|
|||
invalidFlowFile.assertAttributeEquals("record.count", "1");
|
||||
invalidFlowFile.assertContentEquals("invalid\n\"Three\",\"Jack Doe\"\n");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStrictTypeCheck() throws InitializationException, IOException {
|
||||
final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc")), "UTF-8");
|
||||
|
||||
final CSVReader csvReader = new CSVReader();
|
||||
runner.addControllerService("reader", csvReader);
|
||||
runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "csv-header-derived");
|
||||
runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "true");
|
||||
runner.setProperty(csvReader, CSVUtils.IGNORE_CSV_HEADER, "true");
|
||||
runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL.getValue());
|
||||
runner.setProperty(csvReader, CSVUtils.TRAILING_DELIMITER, "false");
|
||||
runner.enableControllerService(csvReader);
|
||||
|
||||
final JsonRecordSetWriter validWriter = new JsonRecordSetWriter();
|
||||
runner.addControllerService("writer", validWriter);
|
||||
runner.setProperty(validWriter, "Schema Write Strategy", "full-schema-attribute");
|
||||
runner.enableControllerService(validWriter);
|
||||
|
||||
final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true);
|
||||
runner.addControllerService("invalid-writer", invalidWriter);
|
||||
runner.enableControllerService(invalidWriter);
|
||||
|
||||
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
|
||||
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
|
||||
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
|
||||
runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
|
||||
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
|
||||
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
|
||||
runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "true");
|
||||
|
||||
// The validationSchema expects 'id' to be int, but CSVReader reads it as 'string'
|
||||
// with strict type check, the type difference is not allowed.
|
||||
final String content = "id, firstName, lastName\n"
|
||||
+ "1, John, Doe\n"
|
||||
+ "2, Jane, Doe\n"
|
||||
+ "Three, Jack, Doe\n";
|
||||
|
||||
runner.enqueue(content);
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(ValidateRecord.REL_VALID, 0);
|
||||
runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
|
||||
runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
|
||||
|
||||
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0);
|
||||
invalidFlowFile.assertAttributeEquals("record.count", "3");
|
||||
final String expectedInvalidContents = "invalid\n"
|
||||
+ "\"1\",\"John\",\"Doe\"\n"
|
||||
+ "\"2\",\"Jane\",\"Doe\"\n"
|
||||
+ "\"Three\",\"Jack\",\"Doe\"\n";
|
||||
invalidFlowFile.assertContentEquals(expectedInvalidContents);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonStrictTypeCheckWithAvroWriter() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException {
|
||||
final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc")), "UTF-8");
|
||||
|
||||
final CSVReader csvReader = new CSVReader();
|
||||
runner.addControllerService("reader", csvReader);
|
||||
runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "csv-header-derived");
|
||||
runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "true");
|
||||
runner.setProperty(csvReader, CSVUtils.IGNORE_CSV_HEADER, "true");
|
||||
runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL.getValue());
|
||||
runner.setProperty(csvReader, CSVUtils.TRAILING_DELIMITER, "false");
|
||||
runner.enableControllerService(csvReader);
|
||||
|
||||
final AvroRecordSetWriter validWriter = new AvroRecordSetWriter();
|
||||
runner.addControllerService("writer", validWriter);
|
||||
runner.setProperty(validWriter, "Schema Write Strategy", "full-schema-attribute");
|
||||
runner.enableControllerService(validWriter);
|
||||
|
||||
final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true);
|
||||
runner.addControllerService("invalid-writer", invalidWriter);
|
||||
runner.enableControllerService(invalidWriter);
|
||||
|
||||
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
|
||||
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
|
||||
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
|
||||
runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
|
||||
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
|
||||
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
|
||||
runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "false");
|
||||
|
||||
// The validationSchema expects 'id' to be int, but CSVReader reads it as 'string'
|
||||
// with non-strict type check, the type difference should be accepted, and results should be written as 'int'.
|
||||
final String content = "id, firstName, lastName\n"
|
||||
+ "1, John, Doe\n"
|
||||
+ "2, Jane, Doe\n"
|
||||
+ "Three, Jack, Doe\n";
|
||||
|
||||
runner.enqueue(content);
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
|
||||
runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
|
||||
runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
|
||||
|
||||
final AvroReader avroReader = new AvroReader();
|
||||
runner.addControllerService("avroReader", avroReader);
|
||||
runner.setProperty(avroReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
|
||||
runner.enableControllerService(avroReader);
|
||||
final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
|
||||
try (
|
||||
final ByteArrayInputStream resultContentStream = new ByteArrayInputStream(validFlowFile.toByteArray());
|
||||
final RecordReader recordReader = avroReader.createRecordReader(validFlowFile.getAttributes(), resultContentStream, runner.getLogger());
|
||||
) {
|
||||
final RecordSchema resultSchema = recordReader.getSchema();
|
||||
assertEquals(3, resultSchema.getFieldCount());
|
||||
|
||||
// The id field should be an int field.
|
||||
final Optional<RecordField> idField = resultSchema.getField("id");
|
||||
assertTrue(idField.isPresent());
|
||||
assertEquals(RecordFieldType.INT, idField.get().getDataType().getFieldType());
|
||||
|
||||
validFlowFile.assertAttributeEquals("record.count", "2");
|
||||
|
||||
Record record = recordReader.nextRecord();
|
||||
assertEquals(1, record.getValue("id"));
|
||||
assertEquals("John", record.getValue("firstName"));
|
||||
assertEquals("Doe", record.getValue("lastName"));
|
||||
|
||||
record = recordReader.nextRecord();
|
||||
assertEquals(2, record.getValue("id"));
|
||||
assertEquals("Jane", record.getValue("firstName"));
|
||||
assertEquals("Doe", record.getValue("lastName"));
|
||||
}
|
||||
|
||||
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0);
|
||||
invalidFlowFile.assertAttributeEquals("record.count", "1");
|
||||
final String expectedInvalidContents = "invalid\n"
|
||||
+ "\"Three\",\"Jack\",\"Doe\"\n";
|
||||
invalidFlowFile.assertContentEquals(expectedInvalidContents);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test case demonstrates the limitation on JsonRecordSetWriter type-coercing when strict type check is disabled.
|
||||
* Since WriteJsonResult.writeRawRecord doesn't use record schema,
|
||||
* type coercing does not happen with JsonWriter even if strict type check is disabled.
|
||||
*
|
||||
* E.g. When an input "1" as string is given, and output field schema is int:
|
||||
* <ul>
|
||||
* <li>Expected result: "id": 1 (without quote)</li>
|
||||
* <li>Actual result: "id": "1" (with quote)</li>
|
||||
* </ul>
|
||||
*/
|
||||
@Test
|
||||
public void testNonStrictTypeCheckWithJsonWriter() throws InitializationException, IOException {
|
||||
final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc")), "UTF-8");
|
||||
|
||||
final CSVReader csvReader = new CSVReader();
|
||||
runner.addControllerService("reader", csvReader);
|
||||
runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "csv-header-derived");
|
||||
runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "true");
|
||||
runner.setProperty(csvReader, CSVUtils.IGNORE_CSV_HEADER, "true");
|
||||
runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL.getValue());
|
||||
runner.setProperty(csvReader, CSVUtils.TRAILING_DELIMITER, "false");
|
||||
runner.enableControllerService(csvReader);
|
||||
|
||||
final JsonRecordSetWriter validWriter = new JsonRecordSetWriter();
|
||||
runner.addControllerService("writer", validWriter);
|
||||
runner.setProperty(validWriter, "Schema Write Strategy", "full-schema-attribute");
|
||||
runner.enableControllerService(validWriter);
|
||||
|
||||
final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true);
|
||||
runner.addControllerService("invalid-writer", invalidWriter);
|
||||
runner.enableControllerService(invalidWriter);
|
||||
|
||||
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
|
||||
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
|
||||
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
|
||||
runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
|
||||
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
|
||||
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
|
||||
runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "false");
|
||||
|
||||
// The validationSchema expects 'id' to be int, but CSVReader reads it as 'string'
|
||||
// with non-strict type check, the type difference should be accepted, and results should be written as 'int'.
|
||||
final String content = "id, firstName, lastName\n"
|
||||
+ "1, John, Doe\n"
|
||||
+ "2, Jane, Doe\n"
|
||||
+ "Three, Jack, Doe\n";
|
||||
|
||||
runner.enqueue(content);
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
|
||||
runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
|
||||
runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
|
||||
|
||||
/*
|
||||
TODO: JsonRecordSetWriter does not coerce value. Should we fix this??
|
||||
*/
|
||||
final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
|
||||
validFlowFile.assertAttributeEquals("record.count", "2");
|
||||
final String expectedValidContents = "[" +
|
||||
"{\"id\":\"1\",\"firstName\":\"John\",\"lastName\":\"Doe\"}," +
|
||||
"{\"id\":\"2\",\"firstName\":\"Jane\",\"lastName\":\"Doe\"}" +
|
||||
"]";
|
||||
validFlowFile.assertContentEquals(expectedValidContents);
|
||||
|
||||
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0);
|
||||
invalidFlowFile.assertAttributeEquals("record.count", "1");
|
||||
final String expectedInvalidContents = "invalid\n"
|
||||
+ "\"Three\",\"Jack\",\"Doe\"\n";
|
||||
invalidFlowFile.assertContentEquals(expectedInvalidContents);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue