diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index a399f6724a..fb6cdbdb46 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -204,6 +204,30 @@ public class DataTypeUtils { if (value == null) { return false; } + + // value may be a Map even when type is RECORD + if (value instanceof Map) { + final RecordSchema schema = ((RecordDataType) dataType).getChildSchema(); + if (schema == null) { + return true; + } + Map record = ((Map) value); + for (final RecordField childField : schema.getFields()) { + final Object childValue = record.get(childField.getFieldName()); + if (childValue == null && !childField.isNullable()) { + logger.debug("Value is not compatible with schema because field {} has a null value, which is not allowed in the schema", childField.getFieldName()); + return false; + } + if (childValue == null) { + continue; // consider compatible + } + + if (!isCompatibleDataType(childValue, childField.getDataType())) { + return false; + } + } + return true; + } if (!(value instanceof Record)) { return false; } @@ -687,6 +711,9 @@ public class DataTypeUtils { return convertRecordMapToJavaMap((Map) value, ((MapDataType) dataType).getValueType()); } else if (dataType != null && isScalarValue(dataType, value)) { return value; + } else if (value instanceof Object[] && dataType instanceof ArrayDataType) { + // This is likely a Map whose values are represented as an array. Return a new array with each element converted to a Java object + return convertRecordArrayToJavaArray((Object[]) value, ((ArrayDataType) dataType).getElementType()); } throw new IllegalTypeConversionException("Cannot convert value of class " + value.getClass().getName() + " because the type is not supported"); diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java index 45b65b4540..cef0eec859 100644 --- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java +++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java @@ -107,6 +107,34 @@ public class TestDataTypeUtils { } } + @Test + public void testConvertArrayOfRecordsToJavaArray() { + final List fields = new ArrayList<>(); + fields.add(new RecordField("stringField", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("intField", RecordFieldType.INT.getDataType())); + + final RecordSchema schema = new SimpleRecordSchema(fields); + + final Map values1 = new HashMap<>(); + values1.put("stringField", "hello"); + values1.put("intField", 5); + final Record inputRecord1 = new MapRecord(schema, values1); + + final Map values2 = new HashMap<>(); + values2.put("stringField", "world"); + values2.put("intField", 50); + final Record inputRecord2 = new MapRecord(schema, values2); + + Object[] recordArray = {inputRecord1, inputRecord2}; + Object resultObj = DataTypeUtils.convertRecordFieldtoObject(recordArray, RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(schema))); + assertNotNull(resultObj); + assertTrue(resultObj instanceof Object[]); + Object[] resultArray = (Object[]) resultObj; + for(Object o : resultArray) { + assertTrue(o instanceof Map); + } + } + @Test @SuppressWarnings("unchecked") public void testConvertRecordFieldToObject() { @@ -252,4 +280,11 @@ public class TestDataTypeUtils { } } } + + @Test + public void testIsCompatibleDataTypeMap() { + Map testMap = new HashMap<>(); + testMap.put("Hello", "World"); + assertTrue(DataTypeUtils.isCompatibleDataType(testMap, RecordFieldType.RECORD.getDataType())); + } }