NIFI-4232: Ensure that we handle conversions to Avro Arrays properly. Also, if unable to convert a value to the expected object, include in the log message the (fully qualified) name of the field that is problematic

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2040.
This commit is contained in:
Mark Payne 2017-07-26 13:04:10 -04:00 committed by Pierre Villard
parent dc4006f423
commit 1d6b486b63
1 changed files with 16 additions and 14 deletions

View File

@ -499,7 +499,7 @@ public class AvroTypeUtil {
final Map<String, Object> objectMap = (Map<String, Object>) rawValue; final Map<String, Object> objectMap = (Map<String, Object>) rawValue;
final Map<String, Object> map = new HashMap<>(objectMap.size()); final Map<String, Object> map = new HashMap<>(objectMap.size());
for (final String s : objectMap.keySet()) { for (final String s : objectMap.keySet()) {
final Object converted = convertToAvroObject(objectMap.get(s), fieldSchema.getValueType(), fieldName); final Object converted = convertToAvroObject(objectMap.get(s), fieldSchema.getValueType(), fieldName + "[" + s + "]");
map.put(s, converted); map.put(s, converted);
} }
return map; return map;
@ -519,18 +519,20 @@ public class AvroTypeUtil {
continue; continue;
} }
final Object converted = convertToAvroObject(recordFieldValue, field.schema(), fieldName); final Object converted = convertToAvroObject(recordFieldValue, field.schema(), fieldName + "/" + recordFieldName);
avroRecord.put(recordFieldName, converted); avroRecord.put(recordFieldName, converted);
} }
return avroRecord; return avroRecord;
case UNION: case UNION:
return convertUnionFieldValue(rawValue, fieldSchema, schema -> convertToAvroObject(rawValue, schema, fieldName)); return convertUnionFieldValue(rawValue, fieldSchema, schema -> convertToAvroObject(rawValue, schema, fieldName), fieldName);
case ARRAY: case ARRAY:
final Object[] objectArray = (Object[]) rawValue; final Object[] objectArray = (Object[]) rawValue;
final List<Object> list = new ArrayList<>(objectArray.length); final List<Object> list = new ArrayList<>(objectArray.length);
int i = 0;
for (final Object o : objectArray) { for (final Object o : objectArray) {
final Object converted = convertToAvroObject(o, fieldSchema.getElementType(), fieldName); final Object converted = convertToAvroObject(o, fieldSchema.getElementType(), fieldName + "[" + i + "]");
list.add(converted); list.add(converted);
i++;
} }
return list; return list;
case BOOLEAN: case BOOLEAN:
@ -572,7 +574,7 @@ public class AvroTypeUtil {
} }
final Schema fieldSchema = avroField.schema(); final Schema fieldSchema = avroField.schema();
final Object rawValue = normalizeValue(value, fieldSchema); final Object rawValue = normalizeValue(value, fieldSchema, fieldName);
final DataType desiredType = recordField.getDataType(); final DataType desiredType = recordField.getDataType();
final Object coercedValue = DataTypeUtils.convertType(rawValue, desiredType, fieldName); final Object coercedValue = DataTypeUtils.convertType(rawValue, desiredType, fieldName);
@ -590,7 +592,7 @@ public class AvroTypeUtil {
* @param conversion the conversion function which takes a non-null field schema within the union field and returns a converted value * @param conversion the conversion function which takes a non-null field schema within the union field and returns a converted value
* @return a converted value * @return a converted value
*/ */
private static Object convertUnionFieldValue(Object originalValue, Schema fieldSchema, Function<Schema, Object> conversion) { private static Object convertUnionFieldValue(Object originalValue, Schema fieldSchema, Function<Schema, Object> conversion, final String fieldName) {
// Ignore null types in union // Ignore null types in union
final List<Schema> nonNullFieldSchemas = getNonNullSubSchemas(fieldSchema); final List<Schema> nonNullFieldSchemas = getNonNullSubSchemas(fieldSchema);
@ -618,7 +620,7 @@ public class AvroTypeUtil {
} }
throw new IllegalTypeConversionException("Cannot convert value " + originalValue + " of type " + originalValue.getClass() throw new IllegalTypeConversionException("Cannot convert value " + originalValue + " of type " + originalValue.getClass()
+ " because no compatible types exist in the UNION"); + " because no compatible types exist in the UNION for field " + fieldName);
} }
return null; return null;
} }
@ -640,7 +642,7 @@ public class AvroTypeUtil {
} }
break; break;
case ARRAY: case ARRAY:
if (value instanceof Array) { if (value instanceof Array || value instanceof List) {
return true; return true;
} }
break; break;
@ -654,7 +656,7 @@ public class AvroTypeUtil {
* Convert an Avro object to a normal Java objects for further processing. * Convert an Avro object to a normal Java objects for further processing.
* The counter-part method which convert a raw value to an Avro object is {@link #convertToAvroObject(Object, Schema, String)} * The counter-part method which convert a raw value to an Avro object is {@link #convertToAvroObject(Object, Schema, String)}
*/ */
private static Object normalizeValue(final Object value, final Schema avroSchema) { private static Object normalizeValue(final Object value, final Schema avroSchema, final String fieldName) {
if (value == null) { if (value == null) {
return null; return null;
} }
@ -696,9 +698,9 @@ public class AvroTypeUtil {
case UNION: case UNION:
if (value instanceof GenericData.Record) { if (value instanceof GenericData.Record) {
final GenericData.Record avroRecord = (GenericData.Record) value; final GenericData.Record avroRecord = (GenericData.Record) value;
return normalizeValue(value, avroRecord.getSchema()); return normalizeValue(value, avroRecord.getSchema(), fieldName);
} }
return convertUnionFieldValue(value, avroSchema, schema -> normalizeValue(value, schema)); return convertUnionFieldValue(value, avroSchema, schema -> normalizeValue(value, schema, fieldName), fieldName);
case RECORD: case RECORD:
final GenericData.Record record = (GenericData.Record) value; final GenericData.Record record = (GenericData.Record) value;
final Schema recordSchema = record.getSchema(); final Schema recordSchema = record.getSchema();
@ -706,7 +708,7 @@ public class AvroTypeUtil {
final Map<String, Object> values = new HashMap<>(recordFields.size()); final Map<String, Object> values = new HashMap<>(recordFields.size());
for (final Field field : recordFields) { for (final Field field : recordFields) {
final Object avroFieldValue = record.get(field.name()); final Object avroFieldValue = record.get(field.name());
final Object fieldValue = normalizeValue(avroFieldValue, field.schema()); final Object fieldValue = normalizeValue(avroFieldValue, field.schema(), fieldName + "/" + field.name());
values.put(field.name(), fieldValue); values.put(field.name(), fieldValue);
} }
final RecordSchema childSchema = AvroTypeUtil.createSchema(recordSchema); final RecordSchema childSchema = AvroTypeUtil.createSchema(recordSchema);
@ -732,7 +734,7 @@ public class AvroTypeUtil {
final Object[] valueArray = new Object[array.size()]; final Object[] valueArray = new Object[array.size()];
for (int i = 0; i < array.size(); i++) { for (int i = 0; i < array.size(); i++) {
final Schema elementSchema = avroSchema.getElementType(); final Schema elementSchema = avroSchema.getElementType();
valueArray[i] = normalizeValue(array.get(i), elementSchema); valueArray[i] = normalizeValue(array.get(i), elementSchema, fieldName + "[" + i + "]");
} }
return valueArray; return valueArray;
case MAP: case MAP:
@ -745,7 +747,7 @@ public class AvroTypeUtil {
} }
final String key = entry.getKey().toString(); final String key = entry.getKey().toString();
obj = normalizeValue(obj, avroSchema.getValueType()); obj = normalizeValue(obj, avroSchema.getValueType(), fieldName + "[" + key + "]");
map.put(key, obj); map.put(key, obj);
} }