mirror of https://github.com/apache/nifi.git
NIFI-10660: When converting Avro GenericRecord to a java Map, check GenericRecord's schema to see if field exists before calling Record.get().
NIFI-10660: Added unit tests; fixed bug to ensure that we use the desired field name when checking if it exists in schema Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #6688
This commit is contained in:
parent
e172a3f224
commit
2126dbbe29
|
@ -20,7 +20,7 @@
|
|||
<artifactId>nifi-record-utils</artifactId>
|
||||
<version>1.19.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
|
||||
<artifactId>nifi-avro-record-utils</artifactId>
|
||||
|
||||
<dependencies>
|
||||
|
@ -75,7 +75,9 @@
|
|||
<exclude>src/test/resources/org/apache/nifi/avro/defaultArrayValue1.json</exclude>
|
||||
<exclude>src/test/resources/org/apache/nifi/avro/defaultArrayValue2.json</exclude>
|
||||
<exclude>src/test/resources/org/apache/nifi/avro/defaultArrayInRecords1.json</exclude>
|
||||
<exclude>src/test/resources/org/apache/nifi/avro/defaultArrayInRecords2.json</exclude>
|
||||
<exclude>src/test/resources/org/apache/nifi/avro/defaultArrayInRecords2.json</exclude>
|
||||
<exclude>src/test/resources/person.avsc</exclude>
|
||||
<exclude>src/test/resources/person-old-schema.avsc</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
|
|
@ -905,36 +905,45 @@ public class AvroTypeUtil {
|
|||
return convertAvroRecordToMap(avroRecord, recordSchema, StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
private static String getMatchingFieldName(final GenericRecord record, final RecordField field) {
|
||||
final Schema schema = record.getSchema();
|
||||
Field avroField = schema.getField(field.getFieldName());
|
||||
if (avroField != null) {
|
||||
return field.getFieldName();
|
||||
}
|
||||
|
||||
for (final String alias : field.getAliases()) {
|
||||
avroField = schema.getField(alias);
|
||||
if (avroField != null) {
|
||||
return alias;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public static Map<String, Object> convertAvroRecordToMap(final GenericRecord avroRecord, final RecordSchema recordSchema, final Charset charset) {
|
||||
final Map<String, Object> values = new HashMap<>(recordSchema.getFieldCount());
|
||||
|
||||
for (final RecordField recordField : recordSchema.getFields()) {
|
||||
|
||||
Object value = avroRecord.get(recordField.getFieldName());
|
||||
if (value == null) {
|
||||
for (final String alias : recordField.getAliases()) {
|
||||
value = avroRecord.get(alias);
|
||||
if (value != null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
final String relevantFieldName = getMatchingFieldName(avroRecord, recordField);
|
||||
final Object value = (relevantFieldName == null) ? null : avroRecord.get(relevantFieldName);
|
||||
|
||||
final String fieldName = recordField.getFieldName();
|
||||
try {
|
||||
final Field avroField = avroRecord.getSchema().getField(fieldName);
|
||||
if (avroField == null) {
|
||||
values.put(fieldName, null);
|
||||
continue;
|
||||
}
|
||||
final Field avroField = avroRecord.getSchema().getField(relevantFieldName);
|
||||
if (avroField == null) {
|
||||
values.put(fieldName, null);
|
||||
continue;
|
||||
}
|
||||
|
||||
final Schema fieldSchema = avroField.schema();
|
||||
final Object rawValue = normalizeValue(value, fieldSchema, fieldName);
|
||||
final Schema fieldSchema = avroField.schema();
|
||||
final Object rawValue = normalizeValue(value, fieldSchema, fieldName);
|
||||
|
||||
final DataType desiredType = recordField.getDataType();
|
||||
final Object coercedValue = DataTypeUtils.convertType(rawValue, desiredType, fieldName, charset);
|
||||
final DataType desiredType = recordField.getDataType();
|
||||
final Object coercedValue = DataTypeUtils.convertType(rawValue, desiredType, fieldName, charset);
|
||||
|
||||
values.put(fieldName, coercedValue);
|
||||
values.put(fieldName, coercedValue);
|
||||
} catch (Exception ex) {
|
||||
logger.debug("fail to convert field " + fieldName, ex );
|
||||
throw ex;
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.nifi.serialization.record.type.RecordDataType;
|
|||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.math.BigDecimal;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -483,20 +484,53 @@ public class TestAvroTypeUtil {
|
|||
|
||||
@Test
|
||||
public void testMapWithNullSchema() throws IOException {
|
||||
|
||||
Schema recursiveSchema = new Schema.Parser().parse(getClass().getResourceAsStream("schema.json"));
|
||||
|
||||
// Make sure the following doesn't throw an exception
|
||||
RecordSchema recordASchema = AvroTypeUtil.createSchema(recursiveSchema.getTypes().get(0));
|
||||
|
||||
// check the fix with the proper file
|
||||
try (DataFileStream<GenericRecord> r = new DataFileStream<>(getClass().getResourceAsStream("data.avro"),
|
||||
try (DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(getClass().getResourceAsStream("data.avro"),
|
||||
new GenericDatumReader<>())) {
|
||||
GenericRecord n = r.next();
|
||||
AvroTypeUtil.convertAvroRecordToMap(n, recordASchema, StandardCharsets.UTF_8);
|
||||
GenericRecord record = dataFileStream.next();
|
||||
AvroTypeUtil.convertAvroRecordToMap(record, recordASchema, StandardCharsets.UTF_8);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertAvroRecordToMapWithAliasInSchema() throws IOException {
|
||||
final Schema personSchema = new Schema.Parser().parse(new File("src/test/resources/person.avsc"));
|
||||
final GenericRecord record = new GenericData.Record(personSchema);
|
||||
record.put("name", "John Doe");
|
||||
record.put("ssn", "111-11-1111");
|
||||
|
||||
final RecordSchema recordSchema = AvroTypeUtil.createSchema(personSchema);
|
||||
final Map<String, Object> map = AvroTypeUtil.convertAvroRecordToMap(record, recordSchema);
|
||||
assertEquals("John Doe", map.get("name"));
|
||||
assertEquals("111-11-1111", map.get("ssn"));
|
||||
assertNull(map.get("favoriteNumber"));
|
||||
assertNull(map.get("favNum"));
|
||||
assertNull(map.get("my favorite"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertAvroRecordToMapUsingAlias() throws IOException {
|
||||
final Schema personOldSchema = new Schema.Parser().parse(new File("src/test/resources/person-old-schema.avsc"));
|
||||
final GenericRecord record = new GenericData.Record(personOldSchema);
|
||||
record.put("name", "John Doe");
|
||||
record.put("id", "111-11-1111");
|
||||
record.put("favNum", 48);
|
||||
|
||||
final Schema personUpdatedSchema = new Schema.Parser().parse(new File("src/test/resources/person.avsc"));
|
||||
final RecordSchema recordSchema = AvroTypeUtil.createSchema(personUpdatedSchema);
|
||||
final Map<String, Object> map = AvroTypeUtil.convertAvroRecordToMap(record, recordSchema);
|
||||
assertEquals("John Doe", map.get("name"));
|
||||
assertEquals("111-11-1111", map.get("ssn"));
|
||||
assertEquals(48, map.get("favoriteNumber"));
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testToDecimalConversion() {
|
||||
final LogicalTypes.Decimal decimalType = LogicalTypes.decimal(26, 8);
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
{
|
||||
"name": "person",
|
||||
"namespace": "nifi",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{ "name": "name", "type": "string" },
|
||||
{ "name": "favNum", "type": ["null", "int"]},
|
||||
{ "name": "id", "type": ["null", "string"]}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
{
|
||||
"name": "person",
|
||||
"namespace": "nifi",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{ "name": "name", "type": "string" },
|
||||
{ "name": "favoriteNumber", "type": ["null", "int"], "aliases": ["favNum", "my favorite"] },
|
||||
{ "name": "ssn", "type": ["null", "string"], "aliases": ["id"] }
|
||||
]
|
||||
}
|
Loading…
Reference in New Issue