diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index 87139c6ced..a39a7f4006 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -499,7 +499,7 @@ public class AvroTypeUtil { final Map objectMap = (Map) rawValue; final Map map = new HashMap<>(objectMap.size()); for (final String s : objectMap.keySet()) { - final Object converted = convertToAvroObject(objectMap.get(s), fieldSchema.getValueType(), fieldName); + final Object converted = convertToAvroObject(objectMap.get(s), fieldSchema.getValueType(), fieldName + "[" + s + "]"); map.put(s, converted); } return map; @@ -519,18 +519,20 @@ public class AvroTypeUtil { continue; } - final Object converted = convertToAvroObject(recordFieldValue, field.schema(), fieldName); + final Object converted = convertToAvroObject(recordFieldValue, field.schema(), fieldName + "/" + recordFieldName); avroRecord.put(recordFieldName, converted); } return avroRecord; case UNION: - return convertUnionFieldValue(rawValue, fieldSchema, schema -> convertToAvroObject(rawValue, schema, fieldName)); + return convertUnionFieldValue(rawValue, fieldSchema, schema -> convertToAvroObject(rawValue, schema, fieldName), fieldName); case ARRAY: final Object[] objectArray = (Object[]) rawValue; final List list = new ArrayList<>(objectArray.length); + int i = 0; for (final Object o : objectArray) { - final Object converted = convertToAvroObject(o, fieldSchema.getElementType(), fieldName); + final Object converted = convertToAvroObject(o, fieldSchema.getElementType(), fieldName + "[" + i + "]"); list.add(converted); + i++; } return list; case BOOLEAN: @@ -572,7 +574,7 @@ public class AvroTypeUtil { } final Schema fieldSchema = avroField.schema(); - final Object rawValue = normalizeValue(value, fieldSchema); + final Object rawValue = normalizeValue(value, fieldSchema, fieldName); final DataType desiredType = recordField.getDataType(); final Object coercedValue = DataTypeUtils.convertType(rawValue, desiredType, fieldName); @@ -590,7 +592,7 @@ public class AvroTypeUtil { * @param conversion the conversion function which takes a non-null field schema within the union field and returns a converted value * @return a converted value */ - private static Object convertUnionFieldValue(Object originalValue, Schema fieldSchema, Function conversion) { + private static Object convertUnionFieldValue(Object originalValue, Schema fieldSchema, Function conversion, final String fieldName) { // Ignore null types in union final List nonNullFieldSchemas = getNonNullSubSchemas(fieldSchema); @@ -618,7 +620,7 @@ public class AvroTypeUtil { } throw new IllegalTypeConversionException("Cannot convert value " + originalValue + " of type " + originalValue.getClass() - + " because no compatible types exist in the UNION"); + + " because no compatible types exist in the UNION for field " + fieldName); } return null; } @@ -640,7 +642,7 @@ public class AvroTypeUtil { } break; case ARRAY: - if (value instanceof Array) { + if (value instanceof Array || value instanceof List) { return true; } break; @@ -654,7 +656,7 @@ public class AvroTypeUtil { * Convert an Avro object to a normal Java objects for further processing. * The counter-part method which convert a raw value to an Avro object is {@link #convertToAvroObject(Object, Schema, String)} */ - private static Object normalizeValue(final Object value, final Schema avroSchema) { + private static Object normalizeValue(final Object value, final Schema avroSchema, final String fieldName) { if (value == null) { return null; } @@ -696,9 +698,9 @@ public class AvroTypeUtil { case UNION: if (value instanceof GenericData.Record) { final GenericData.Record avroRecord = (GenericData.Record) value; - return normalizeValue(value, avroRecord.getSchema()); + return normalizeValue(value, avroRecord.getSchema(), fieldName); } - return convertUnionFieldValue(value, avroSchema, schema -> normalizeValue(value, schema)); + return convertUnionFieldValue(value, avroSchema, schema -> normalizeValue(value, schema, fieldName), fieldName); case RECORD: final GenericData.Record record = (GenericData.Record) value; final Schema recordSchema = record.getSchema(); @@ -706,7 +708,7 @@ public class AvroTypeUtil { final Map values = new HashMap<>(recordFields.size()); for (final Field field : recordFields) { final Object avroFieldValue = record.get(field.name()); - final Object fieldValue = normalizeValue(avroFieldValue, field.schema()); + final Object fieldValue = normalizeValue(avroFieldValue, field.schema(), fieldName + "/" + field.name()); values.put(field.name(), fieldValue); } final RecordSchema childSchema = AvroTypeUtil.createSchema(recordSchema); @@ -732,7 +734,7 @@ public class AvroTypeUtil { final Object[] valueArray = new Object[array.size()]; for (int i = 0; i < array.size(); i++) { final Schema elementSchema = avroSchema.getElementType(); - valueArray[i] = normalizeValue(array.get(i), elementSchema); + valueArray[i] = normalizeValue(array.get(i), elementSchema, fieldName + "[" + i + "]"); } return valueArray; case MAP: @@ -745,7 +747,7 @@ public class AvroTypeUtil { } final String key = entry.getKey().toString(); - obj = normalizeValue(obj, avroSchema.getValueType()); + obj = normalizeValue(obj, avroSchema.getValueType(), fieldName + "[" + key + "]"); map.put(key, obj); }