NIFI-9698: When creating an Avro schema, ensure that any default value is converted from what is returned by RecordField.getDefaultValue() to what Avro requires.

Signed-off-by: Chris Sampson <chriss@apache.org>

This closes #5776
This commit is contained in:
Mark Payne 2022-02-16 11:59:56 -05:00 committed by Chris Sampson
parent 2aa6bd1e13
commit 4d8c79d7f3
2 changed files with 28 additions and 3 deletions

View File

@ -124,16 +124,18 @@ public class AvroTypeUtil {
return avroSchema; return avroSchema;
} }
private static Field buildAvroField(final RecordField recordField, String fieldNamePrefix) { private static Field buildAvroField(final RecordField recordField, final String fieldNamePrefix) {
final Schema schema = buildAvroSchema(recordField.getDataType(), recordField.getFieldName(), fieldNamePrefix, recordField.isNullable()); final Schema schema = buildAvroSchema(recordField.getDataType(), recordField.getFieldName(), fieldNamePrefix, recordField.isNullable());
final Field field; final Field field;
final String recordFieldName = recordField.getFieldName(); final String recordFieldName = recordField.getFieldName();
if (isValidAvroFieldName(recordFieldName)) { if (isValidAvroFieldName(recordFieldName)) {
field = new Field(recordField.getFieldName(), schema, null, recordField.getDefaultValue()); final Object avroDefaultValue = convertToAvroObject(recordField.getDefaultValue(), schema);
field = new Field(recordField.getFieldName(), schema, null, avroDefaultValue);
} else { } else {
final String validName = createValidAvroFieldName(recordField.getFieldName()); final String validName = createValidAvroFieldName(recordField.getFieldName());
field = new Field(validName, schema, null, recordField.getDefaultValue()); final Object avroDefaultValue = convertToAvroObject(recordField.getDefaultValue(), schema);
field = new Field(validName, schema, null, avroDefaultValue);
field.addAlias(recordField.getFieldName()); field.addAlias(recordField.getFieldName());
} }

View File

@ -122,6 +122,29 @@ public class TestAvroTypeUtil {
assertEquals("blue", avroRecord.get("color")); assertEquals("blue", avroRecord.get("color"));
} }
@Test
public void testExtractAvroSchemaWithDefaults() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("string", RecordFieldType.STRING.getDataType(), "hello"));
fields.add(new RecordField("int", RecordFieldType.INT.getDataType(), 17));
fields.add(new RecordField("long", RecordFieldType.LONG.getDataType(), 42));
fields.add(new RecordField("float", RecordFieldType.FLOAT.getDataType(), 2.4F));
fields.add(new RecordField("double", RecordFieldType.DOUBLE.getDataType(), 28.1D));
fields.add(new RecordField("stringArray", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()), new String[0]));
fields.add(new RecordField("intArray", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()), new Integer[0]));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
assertEquals("hello", avroSchema.getField("string").defaultVal());
assertEquals(17, avroSchema.getField("int").defaultVal());
assertEquals(42L, avroSchema.getField("long").defaultVal());
assertEquals(2.4D, (double) avroSchema.getField("float").defaultVal(), 0.002D); // Even though we provide a Float, avro converts it into a Double value.
assertEquals(28.1D, (double) avroSchema.getField("double").defaultVal(), 0.002D);
assertEquals(new ArrayList<String>(), avroSchema.getField("stringArray").defaultVal());
assertEquals(new ArrayList<Integer>(), avroSchema.getField("intArray").defaultVal());
}
@Test @Test
public void testAvroDefaultValueWithFieldInSchemaButNotRecord() throws IOException { public void testAvroDefaultValueWithFieldInSchemaButNotRecord() throws IOException {
final List<RecordField> fields = new ArrayList<>(); final List<RecordField> fields = new ArrayList<>();