NIFI-7390 Covering Avro type conversion in case of map withing Record

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4256
This commit is contained in:
Bence Simon 2020-05-06 16:04:36 +02:00 committed by Matthew Burgess
parent 6e2f86d716
commit 66b175f405
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
2 changed files with 52 additions and 1 deletions

View File

@ -747,7 +747,7 @@ public class AvroTypeUtil {
for (final RecordField recordField : recordValue.getSchema().getFields()) {
final Object v = recordValue.getValue(recordField);
if (v != null) {
map.put(recordField.getFieldName(), v);
map.put(recordField.getFieldName(), convertToAvroObject(v, fieldSchema.getValueType(), fieldName + "[" + recordField.getFieldName() + "]", charset));
}
}

View File

@ -33,6 +33,7 @@ import org.apache.avro.util.Utf8;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
@ -691,4 +692,54 @@ public class TestAvroTypeUtil {
// THEN
assertEquals(expected, actual);
}
@Test
public void testConvertNifiRecordIntoAvroRecord() throws IOException {
// given
final MapRecord nifiRecord = givenRecordContainingNumericMap();
final Schema avroSchema = givenAvroSchemaContainingNumericMap();
// when
final GenericRecord result = AvroTypeUtil.createAvroRecord(nifiRecord, avroSchema);
// then
final HashMap<String, Object> numbers = (HashMap<String, Object>) result.get("numbers");
Assert.assertTrue(Long.class.isInstance(numbers.get("number1")));
Assert.assertTrue(Long.class.isInstance(numbers.get("number2")));
}
private MapRecord givenRecordContainingNumericMap() {
final Map<String, Object> numberValues = new HashMap<>();
numberValues.put("number1", 123); // Intentionally an Integer as validation accepts it
numberValues.put("number2", 123L);
final List<RecordField> numberFields = Arrays.asList(
new RecordField("number1", RecordFieldType.LONG.getDataType()),
new RecordField("number2", RecordFieldType.LONG.getDataType())
);
final RecordSchema nifiNumberSchema = new SimpleRecordSchema(numberFields);
final MapRecord numberRecord = new MapRecord(new SimpleRecordSchema(numberFields), numberValues);
final Map<String, Object> values = new HashMap<>();
values.put("id", 1);
values.put("numbers", numberRecord);
final List<RecordField> fields = Arrays.asList(
new RecordField("id", RecordFieldType.INT.getDataType()),
new RecordField("numbers", RecordFieldType.RECORD.getRecordDataType(nifiNumberSchema))
);
return new MapRecord(new SimpleRecordSchema(fields), values);
}
private Schema givenAvroSchemaContainingNumericMap() {
final List<Field> avroFields = Arrays.asList(
new Field("id", Schema.create(Type.INT), "", ""),
new Field("numbers", Schema.createMap(Schema.create(Type.LONG)), "", "")
);
return Schema.createRecord(avroFields);
}
}