mirror of https://github.com/apache/nifi.git
NIFI-6963: Fix ValidateRecord handling of missing required array fields
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #3948.
This commit is contained in:
parent
209c560ff5
commit
f4ef45678e
|
@ -626,7 +626,7 @@ public class AvroTypeUtil {
|
|||
recordFields.add(new RecordField(fieldName, dataType, field.aliases(), nullable));
|
||||
} else {
|
||||
Object defaultValue = field.defaultVal();
|
||||
if (fieldSchema.getType() == Schema.Type.ARRAY && !DataTypeUtils.isArrayTypeCompatible(defaultValue, ((ArrayDataType) dataType).getElementType())) {
|
||||
if (defaultValue != null && fieldSchema.getType() == Schema.Type.ARRAY && !DataTypeUtils.isArrayTypeCompatible(defaultValue, ((ArrayDataType) dataType).getElementType())) {
|
||||
defaultValue = defaultValue instanceof List ? ((List<?>) defaultValue).toArray() : new Object[0];
|
||||
}
|
||||
recordFields.add(new RecordField(fieldName, dataType, defaultValue, field.aliases(), nullable));
|
||||
|
|
|
@ -570,6 +570,9 @@
|
|||
<exclude>src/test/resources/TestForkRecord/schema/schema.avsc</exclude>
|
||||
<exclude>src/test/resources/TestConvertRecord/schema/person.avsc</exclude>
|
||||
<exclude>src/test/resources/TestConvertRecord/input/person.json</exclude>
|
||||
<exclude>src/test/resources/TestValidateRecord/missing-array.json</exclude>
|
||||
<exclude>src/test/resources/TestValidateRecord/missing-array.avsc</exclude>
|
||||
<exclude>src/test/resources/TestValidateRecord/missing-array-with-default.avsc</exclude>
|
||||
<exclude>src/test/resources/TestValidateRecord/nested-map-input.json</exclude>
|
||||
<exclude>src/test/resources/TestValidateRecord/nested-map-schema.avsc</exclude>
|
||||
<exclude>src/test/resources/TestValidateRecord/timestamp.avsc</exclude>
|
||||
|
|
|
@ -412,6 +412,80 @@ public class TestValidateRecord {
|
|||
runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateMissingRequiredArray() throws InitializationException, IOException {
|
||||
final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestValidateRecord/missing-array.avsc")), StandardCharsets.UTF_8);
|
||||
|
||||
final JsonTreeReader jsonReader = new JsonTreeReader();
|
||||
runner.addControllerService("reader", jsonReader);
|
||||
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "schema-text-property");
|
||||
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, validateSchema);
|
||||
runner.enableControllerService(jsonReader);
|
||||
|
||||
final JsonRecordSetWriter validWriter = new JsonRecordSetWriter();
|
||||
runner.addControllerService("writer", validWriter);
|
||||
runner.setProperty(validWriter, "Schema Write Strategy", "full-schema-attribute");
|
||||
runner.enableControllerService(validWriter);
|
||||
|
||||
final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true);
|
||||
runner.addControllerService("invalid-writer", invalidWriter);
|
||||
runner.enableControllerService(invalidWriter);
|
||||
|
||||
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
|
||||
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
|
||||
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
|
||||
runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
|
||||
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
|
||||
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "true");
|
||||
|
||||
// The record is invalid due to not containing the required array from the schema
|
||||
runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "false");
|
||||
runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/missing-array.json"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(ValidateRecord.REL_VALID, 0);
|
||||
runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
|
||||
runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
|
||||
runner.clearTransferState();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateMissingRequiredArrayWithDefault() throws InitializationException, IOException {
|
||||
final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestValidateRecord/missing-array-with-default.avsc")), StandardCharsets.UTF_8);
|
||||
|
||||
final JsonTreeReader jsonReader = new JsonTreeReader();
|
||||
runner.addControllerService("reader", jsonReader);
|
||||
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "schema-text-property");
|
||||
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, validateSchema);
|
||||
runner.enableControllerService(jsonReader);
|
||||
|
||||
final JsonRecordSetWriter validWriter = new JsonRecordSetWriter();
|
||||
runner.addControllerService("writer", validWriter);
|
||||
runner.setProperty(validWriter, "Schema Write Strategy", "full-schema-attribute");
|
||||
runner.enableControllerService(validWriter);
|
||||
|
||||
final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true);
|
||||
runner.addControllerService("invalid-writer", invalidWriter);
|
||||
runner.enableControllerService(invalidWriter);
|
||||
|
||||
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
|
||||
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
|
||||
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
|
||||
runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
|
||||
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
|
||||
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "true");
|
||||
|
||||
// The record is invalid due to not containing the required array from the schema
|
||||
runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "false");
|
||||
runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/missing-array.json"));
|
||||
runner.run();
|
||||
|
||||
runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
|
||||
runner.assertTransferCount(ValidateRecord.REL_INVALID, 0);
|
||||
runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
|
||||
runner.clearTransferState();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testValidateJsonTimestamp() throws IOException, InitializationException {
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
{
|
||||
"name": "Test",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{
|
||||
"name": "Records",
|
||||
"type": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"name": "Records_record",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{
|
||||
"name": "id",
|
||||
"type": "string"
|
||||
}
|
||||
]
|
||||
}
|
||||
}, "default": []
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
{
|
||||
"name": "Test",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{
|
||||
"name": "Records",
|
||||
"type": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"name": "Records_record",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{
|
||||
"name": "id",
|
||||
"type": "string"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,5 @@
|
|||
{
|
||||
"NotRecords": [{
|
||||
"id": "1"
|
||||
}]
|
||||
}
|
Loading…
Reference in New Issue