NIFI-9335: Updated AvroTypeUtil#createAvroRecord to ensure that if the given Avro Schema contains a field whose value is defaulted, the produced Avro Record has that value populated. Also added a unit test to verify behavior. Ran performance test (which is igonred and must be manually enabled) - about 2.1 seconds before the change and about 2.5 seconds after the change for each iteration.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5483
This commit is contained in:
Mark Payne 2021-10-26 10:05:31 -04:00 committed by Matthew Burgess
parent 8506a6012f
commit 1191e511a5
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
2 changed files with 33 additions and 2 deletions

View File

@ -619,6 +619,19 @@ public class AvroTypeUtil {
rec.put(fieldName, converted);
}
// see if the Avro schema has any fields that aren't in the RecordSchema, and if those fields have a default
// value then we want to populate it in the GenericRecord being produced
for (final Field field : avroSchema.getFields()) {
if (field.defaultVal() == null) {
continue;
}
final Optional<RecordField> recordField = recordSchema.getField(field.name());
if (!recordField.isPresent() && rec.get(field.name()) == null) {
rec.put(field.name(), field.defaultVal());
}
}
return rec;
}

View File

@ -23,6 +23,7 @@ import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
@ -31,7 +32,6 @@ import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.util.Utf8;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
@ -105,7 +105,25 @@ public class TestAvroTypeUtil {
}
@Test
public void testCreateAvroSchemaPrimitiveTypes() throws SchemaNotFoundException {
public void testAvroDefaultValueWithNoFieldInRecordOrSchema() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema personSchema = new SimpleRecordSchema(fields);
final org.apache.nifi.serialization.record.Record record = new MapRecord(personSchema, Collections.singletonMap("name", "John Doe"));
final Schema avroSchema = SchemaBuilder.record("person").namespace("nifi")
.fields()
.requiredString("name")
.name("color").type().stringType().stringDefault("blue")
.endRecord();
final GenericRecord avroRecord = AvroTypeUtil.createAvroRecord(record, avroSchema);
assertEquals("John Doe", avroRecord.get("name"));
assertEquals("blue", avroRecord.get("color"));
}
@Test
public void testCreateAvroSchemaPrimitiveTypes() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("int", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("long", RecordFieldType.LONG.getDataType()));