mirror of https://github.com/apache/nifi.git
NIFI-6105: Fix handling of arrays of records/maps in record utilities
This closes #3353. Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
parent
35d1cace08
commit
f91311da9d
|
@ -204,6 +204,30 @@ public class DataTypeUtils {
|
|||
if (value == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// value may be a Map even when type is RECORD
|
||||
if (value instanceof Map) {
|
||||
final RecordSchema schema = ((RecordDataType) dataType).getChildSchema();
|
||||
if (schema == null) {
|
||||
return true;
|
||||
}
|
||||
Map<String, Object> record = ((Map<String, Object>) value);
|
||||
for (final RecordField childField : schema.getFields()) {
|
||||
final Object childValue = record.get(childField.getFieldName());
|
||||
if (childValue == null && !childField.isNullable()) {
|
||||
logger.debug("Value is not compatible with schema because field {} has a null value, which is not allowed in the schema", childField.getFieldName());
|
||||
return false;
|
||||
}
|
||||
if (childValue == null) {
|
||||
continue; // consider compatible
|
||||
}
|
||||
|
||||
if (!isCompatibleDataType(childValue, childField.getDataType())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
if (!(value instanceof Record)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -687,6 +711,9 @@ public class DataTypeUtils {
|
|||
return convertRecordMapToJavaMap((Map) value, ((MapDataType) dataType).getValueType());
|
||||
} else if (dataType != null && isScalarValue(dataType, value)) {
|
||||
return value;
|
||||
} else if (value instanceof Object[] && dataType instanceof ArrayDataType) {
|
||||
// This is likely a Map whose values are represented as an array. Return a new array with each element converted to a Java object
|
||||
return convertRecordArrayToJavaArray((Object[]) value, ((ArrayDataType) dataType).getElementType());
|
||||
}
|
||||
|
||||
throw new IllegalTypeConversionException("Cannot convert value of class " + value.getClass().getName() + " because the type is not supported");
|
||||
|
|
|
@ -107,6 +107,34 @@ public class TestDataTypeUtils {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertArrayOfRecordsToJavaArray() {
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField("stringField", RecordFieldType.STRING.getDataType()));
|
||||
fields.add(new RecordField("intField", RecordFieldType.INT.getDataType()));
|
||||
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
final Map<String, Object> values1 = new HashMap<>();
|
||||
values1.put("stringField", "hello");
|
||||
values1.put("intField", 5);
|
||||
final Record inputRecord1 = new MapRecord(schema, values1);
|
||||
|
||||
final Map<String, Object> values2 = new HashMap<>();
|
||||
values2.put("stringField", "world");
|
||||
values2.put("intField", 50);
|
||||
final Record inputRecord2 = new MapRecord(schema, values2);
|
||||
|
||||
Object[] recordArray = {inputRecord1, inputRecord2};
|
||||
Object resultObj = DataTypeUtils.convertRecordFieldtoObject(recordArray, RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(schema)));
|
||||
assertNotNull(resultObj);
|
||||
assertTrue(resultObj instanceof Object[]);
|
||||
Object[] resultArray = (Object[]) resultObj;
|
||||
for(Object o : resultArray) {
|
||||
assertTrue(o instanceof Map);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testConvertRecordFieldToObject() {
|
||||
|
@ -252,4 +280,11 @@ public class TestDataTypeUtils {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsCompatibleDataTypeMap() {
|
||||
Map<String,Object> testMap = new HashMap<>();
|
||||
testMap.put("Hello", "World");
|
||||
assertTrue(DataTypeUtils.isCompatibleDataType(testMap, RecordFieldType.RECORD.getDataType()));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue