NIFI-8609: Optimized AvroTypeUtil Record creation and conversion

Added unit test that is ignored so that it can be manually run for testing performance before/after changes to AvroTypeUtil. Updated AvroTypeUtil to be more efficient by not using Record.getValue() and instead iterating over the Map of values directly. getValue() is less efficient here because we know the RecordField's we are iterating over exist in the schema since they are retrieved from there directly; as a result, any null values still have be looked up by aliaases, but that step can be skipped in this situation. Also avoided looking for fields that exist in Avro Schema and not in RecordSchema just to set default values on GenericRecord - there's no need to set them if they are default values.

This closes #5080

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2021-05-17 17:35:50 -04:00 committed by exceptionfactory
parent 8004aa5e6e
commit e06afbdd22
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 78 additions and 21 deletions

View File

@ -585,27 +585,37 @@ public class AvroTypeUtil {
final GenericRecord rec = new GenericData.Record(avroSchema);
final RecordSchema recordSchema = record.getSchema();
for (final RecordField recordField : recordSchema.getFields()) {
final Object rawValue = record.getValue(recordField);
Pair<String, Field> fieldPair = lookupField(avroSchema, recordField);
final String fieldName = fieldPair.getLeft();
final Field field = fieldPair.getRight();
if (field == null) {
final Map<String, Object> recordValues = record.toMap();
for (final Map.Entry<String, Object> entry : recordValues.entrySet()) {
final Object rawValue = entry.getValue();
if (rawValue == null) {
continue;
}
final Object converted = convertToAvroObject(rawValue, field.schema(), fieldName, charset);
rec.put(field.name(), converted);
}
// see if the Avro schema has any fields that aren't in the RecordSchema, and if those fields have a default
// value then we want to populate it in the GenericRecord being produced
for (final Field field : avroSchema.getFields()) {
final Optional<RecordField> recordField = recordSchema.getField(field.name());
if (!recordField.isPresent() && rec.get(field.name()) == null && field.defaultVal() != null) {
rec.put(field.name(), field.defaultVal());
final String rawFieldName = entry.getKey();
final Optional<RecordField> optionalRecordField = recordSchema.getField(rawFieldName);
if (!optionalRecordField.isPresent()) {
continue;
}
final RecordField recordField = optionalRecordField.get();
final Field field;
final Field avroField = avroSchema.getField(rawFieldName);
if (avroField == null) {
final Pair<String, Field> fieldPair = lookupField(avroSchema, recordField);
field = fieldPair.getRight();
if (field == null) {
continue;
}
} else {
field = avroField;
}
final String fieldName = field.name();
final Object converted = convertToAvroObject(rawValue, field.schema(), fieldName, charset);
rec.put(fieldName, converted);
}
return rec;
@ -850,6 +860,10 @@ public class AvroTypeUtil {
throw new IllegalTypeConversionException(rawValue + " is not a possible value of the ENUM" + enums + ".");
}
case STRING:
if (rawValue instanceof String) {
return rawValue;
}
return DataTypeUtils.toString(rawValue, (String) null, charset);
}
@ -913,12 +927,19 @@ public class AvroTypeUtil {
// we will have two possible types, and one of them will be null. When this happens, we can be much more efficient by simply
// determining the non-null type and converting to that.
final List<Schema> schemaTypes = fieldSchema.getTypes();
if (schemaTypes.size() == 2 && (schemaTypes.get(0).getType() == Type.NULL || schemaTypes.get(1).getType() == Type.NULL)) {
final Schema nonNullType = schemaTypes.get(0).getType() == Type.NULL ? schemaTypes.get(1) : schemaTypes.get(0);
return conversion.apply(nonNullType);
if (schemaTypes.size() == 2) {
final Schema firstSchema = schemaTypes.get(0);
final Schema secondSchema = schemaTypes.get(1);
if (firstSchema.getType() == Type.NULL) {
return conversion.apply(secondSchema);
}
if (secondSchema.getType() == Type.NULL) {
return conversion.apply(firstSchema);
}
}
Optional<Schema> mostSuitableType = DataTypeUtils.findMostSuitableType(
final Optional<Schema> mostSuitableType = DataTypeUtils.findMostSuitableType(
originalValue,
getNonNullSubSchemas(fieldSchema),
AvroTypeUtil::determineDataType

View File

@ -40,6 +40,7 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
@ -70,6 +71,41 @@ import static org.junit.Assert.fail;
public class TestAvroTypeUtil {
@Test
@Ignore("Performance test meant for manually testing only before/after changes in order to measure performance difference caused by changes.")
public void testCreateAvroRecordPerformance() throws IOException {
final List<RecordField> fields = new ArrayList<>();
for (int i=0; i < 100; i++) {
fields.add(new RecordField("field" + i, RecordFieldType.STRING.getDataType(), true));
}
final RecordSchema recordSchema = new SimpleRecordSchema(fields);
final Schema avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
final Map<String, Object> values = new HashMap<>();
for (int i=0; i < 100; i++) {
// Leave half of the values null
if (i % 2 == 0) {
values.put("field" + i, String.valueOf(i));
}
}
final MapRecord record = new MapRecord(recordSchema, values);
final int iterations = 1_000_000;
for (int j=0; j < 1_000; j++) {
final long start = System.currentTimeMillis();
for (int i = 0; i < iterations; i++) {
AvroTypeUtil.createAvroRecord(record, avroSchema);
}
final long millis = System.currentTimeMillis() - start;
System.out.println(millis);
}
}
@Test
public void testCreateAvroSchemaPrimitiveTypes() throws SchemaNotFoundException {
final List<RecordField> fields = new ArrayList<>();