NIFI-8135 allow CHOICE data types in conversion of Records to Java Maps

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #7746
This commit is contained in:
Chris Sampson 2023-09-16 18:34:21 +01:00 committed by Matt Burgess
parent d4014c71ee
commit aac71c5aa1
7 changed files with 183 additions and 62 deletions

View File

@ -806,6 +806,7 @@ public class DataTypeUtils {
for (final Object key : original.keySet()) {
if (!(key instanceof String)) {
keysAreStrings = false;
break;
}
}
@ -854,79 +855,82 @@ public class DataTypeUtils {
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public static Object convertRecordFieldtoObject(final Object value, final DataType dataType) {
if (value == null) {
return null;
}
DataType chosenDataType;
if (dataType instanceof ChoiceDataType) {
final DataType chosen = chooseDataType(value, (ChoiceDataType) dataType);
chosenDataType = chosen != null ? chosen : dataType;
} else {
chosenDataType = dataType;
}
if (value instanceof Record) {
Record record = (Record) value;
RecordSchema recordSchema = record.getSchema();
final Record record = (Record) value;
final RecordSchema recordSchema = record.getSchema();
if (recordSchema == null) {
throw new IllegalTypeConversionException("Cannot convert value of type Record to Map because Record does not have an associated Schema");
}
final Map<String, Object> recordMap = new LinkedHashMap<>();
for (RecordField field : recordSchema.getFields()) {
final DataType fieldDataType = field.getDataType();
final Map<String, Object> recordMap = new LinkedHashMap<>(record.getRawFieldNames().size(), 1);
for (final RecordField field : recordSchema.getFields()) {
final String fieldName = field.getFieldName();
Object fieldValue = record.getValue(fieldName);
final Object fieldValue = record.getValue(fieldName);
if (field.getDataType() instanceof ChoiceDataType) {
final DataType chosen = chooseDataType(fieldValue, (ChoiceDataType) field.getDataType());
chosenDataType = chosen != null ? chosen : field.getDataType();
} else {
chosenDataType = field.getDataType();
}
if (fieldValue == null) {
recordMap.put(fieldName, null);
} else if (isScalarValue(fieldDataType, fieldValue)) {
} else if (isScalarValue(chosenDataType, fieldValue)) {
recordMap.put(fieldName, fieldValue);
} else if (fieldDataType instanceof RecordDataType) {
} else if (chosenDataType instanceof RecordDataType) {
Record nestedRecord = (Record) fieldValue;
recordMap.put(fieldName, convertRecordFieldtoObject(nestedRecord, fieldDataType));
} else if (fieldDataType instanceof MapDataType) {
recordMap.put(fieldName, convertRecordMapToJavaMap((Map) fieldValue, ((MapDataType)fieldDataType).getValueType()));
} else if (fieldDataType instanceof ArrayDataType) {
recordMap.put(fieldName, convertRecordArrayToJavaArray((Object[])fieldValue, ((ArrayDataType) fieldDataType).getElementType()));
recordMap.put(fieldName, convertRecordFieldtoObject(nestedRecord, chosenDataType));
} else if (chosenDataType instanceof MapDataType) {
recordMap.put(fieldName, convertRecordMapToJavaMap((Map) fieldValue, ((MapDataType) chosenDataType).getValueType()));
} else if (chosenDataType instanceof ArrayDataType) {
recordMap.put(fieldName, convertRecordArrayToJavaArray((Object[]) fieldValue, ((ArrayDataType) chosenDataType).getElementType()));
} else {
throw new IllegalTypeConversionException("Cannot convert value [" + fieldValue + "] of type " + fieldDataType.toString()
throw new IllegalTypeConversionException("Cannot convert value [" + fieldValue + "] of type " + chosenDataType
+ " to Map for field " + fieldName + " because the type is not supported");
}
}
return recordMap;
} else if (value instanceof Map) {
return convertRecordMapToJavaMap((Map) value, ((MapDataType) dataType).getValueType());
} else if (dataType != null && isScalarValue(dataType, value)) {
return convertRecordMapToJavaMap((Map) value, ((MapDataType) chosenDataType).getValueType());
} else if (chosenDataType != null && isScalarValue(chosenDataType, value)) {
return value;
} else if (value instanceof Object[] && dataType instanceof ArrayDataType) {
} else if (value instanceof Object[] && chosenDataType instanceof ArrayDataType) {
// This is likely a Map whose values are represented as an array. Return a new array with each element converted to a Java object
return convertRecordArrayToJavaArray((Object[]) value, ((ArrayDataType) dataType).getElementType());
return convertRecordArrayToJavaArray((Object[]) value, ((ArrayDataType) chosenDataType).getElementType());
}
throw new IllegalTypeConversionException("Cannot convert value of class " + value.getClass().getName() + " because the type is not supported");
}
public static Map<String, Object> convertRecordMapToJavaMap(final Map<String, Object> map, DataType valueDataType) {
public static Map<String, Object> convertRecordMapToJavaMap(final Map<String, Object> map, final DataType valueDataType) {
if (map == null) {
return null;
}
Map<String, Object> resultMap = new LinkedHashMap<>();
for (Map.Entry<String, Object> entry : map.entrySet()) {
final Map<String, Object> resultMap = new LinkedHashMap<>();
for (final Map.Entry<String, Object> entry : map.entrySet()) {
resultMap.put(entry.getKey(), convertRecordFieldtoObject(entry.getValue(), valueDataType));
}
return resultMap;
}
public static Object[] convertRecordArrayToJavaArray(final Object[] array, DataType elementDataType) {
if (array == null || array.length == 0 || isScalarValue(elementDataType, array[0])) {
public static Object[] convertRecordArrayToJavaArray(final Object[] array, final DataType elementDataType) {
if (array == null || array.length == 0 || Arrays.stream(array).allMatch(o -> isScalarValue(elementDataType, o))) {
return array;
} else {
// Must be an array of complex types, build an array of converted values
Object[] resultArray = new Object[array.length];
for (int i = 0; i < array.length; i++) {
resultArray[i] = convertRecordFieldtoObject(array[i], elementDataType);
}
return resultArray;
return Arrays.stream(array).map(o -> convertRecordFieldtoObject(o, elementDataType)).toArray();
}
}
@ -1089,7 +1093,7 @@ public class DataTypeUtils {
if(dataType.getEnums() != null && dataType.getEnums().contains(value)) {
return value.toString();
}
throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + dataType.toString() + " for field " + fieldName);
throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + dataType + " for field " + fieldName);
}
public static java.sql.Date toDate(final Object value, final Supplier<DateFormat> format, final String fieldName) {
@ -1933,11 +1937,7 @@ public class DataTypeUtils {
return true;
}
if (!Objects.equals(thisField.getDefaultValue(), otherField.getDefaultValue())) {
return true;
}
return false;
return !Objects.equals(thisField.getDefaultValue(), otherField.getDefaultValue());
}
public static RecordField merge(final RecordField thisField, final RecordField otherField) {

View File

@ -232,6 +232,30 @@ public class TestDataTypeUtils {
}
}
@Test
void testConvertRecordFieldToObjectWithNestedRecord() {
final Record record = DataTypeUtils.toRecord(new LinkedHashMap<String, Object>(){{
put("firstName", "John");
put("age", 30);
put("addresses", new Object[] {"some string", DataTypeUtils.toRecord(Collections.singletonMap("address_1", "123 Fake Street"), "addresses")});
}}, "");
final Object obj = DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getDataType());
assertTrue(obj instanceof Map);
final Map<String, Object> map = (Map<String, Object>) obj;
assertEquals("John", map.get("firstName"));
assertEquals(30, map.get("age"));
assertTrue(map.get("addresses") instanceof Object[]);
final Object[] objArray = (Object[]) map.get("addresses");
assertEquals(2, objArray.length);
assertEquals("some string", objArray[0]);
assertTrue(objArray[1] instanceof Map);
final Map<String, Object> addressMap = (Map<String, Object>) objArray[1];
assertEquals("123 Fake Street", addressMap.get("address_1"));
}
@Test
@SuppressWarnings("unchecked")
public void testConvertRecordFieldToObject() {
@ -243,12 +267,18 @@ public class TestDataTypeUtils {
fields.add(new RecordField("noDefault", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType())));
fields.add(new RecordField("intField", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("intArray", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType())));
fields.add(new RecordField("objArray", RecordFieldType.ARRAY.getArrayDataType(
RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType())
)));
fields.add(new RecordField("choiceArray", RecordFieldType.ARRAY.getArrayDataType(
RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()))
)));
// Map of Records with Arrays
List<RecordField> nestedRecordFields = new ArrayList<>();
final List<RecordField> nestedRecordFields = new ArrayList<>();
nestedRecordFields.add(new RecordField("a", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType())));
nestedRecordFields.add(new RecordField("b", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
RecordSchema nestedRecordSchema = new SimpleRecordSchema(nestedRecordFields);
final RecordSchema nestedRecordSchema = new SimpleRecordSchema(nestedRecordFields);
fields.add(new RecordField("complex", RecordFieldType.MAP.getMapDataType(RecordFieldType.RECORD.getRecordDataType(nestedRecordSchema))));
@ -257,6 +287,9 @@ public class TestDataTypeUtils {
values.put("noDefault", "world");
values.put("intField", 5);
values.put("intArray", new Integer[] {3,2,1});
values.put("objArray", new Object[] {3,"2","abc",1});
values.put("noChoiceArray", new Object[] {"foo","BAR"});
values.put("choiceArray", new Object[] {"foo",new Object[]{"bar","baz"}});
final Map<String, Object> complexValues = new HashMap<>();
final Map<String, Object> complexValueRecord1 = new HashMap<>();
@ -275,22 +308,38 @@ public class TestDataTypeUtils {
Object o = DataTypeUtils.convertRecordFieldtoObject(inputRecord, RecordFieldType.RECORD.getRecordDataType(schema));
assertTrue(o instanceof Map);
Map<String,Object> outputMap = (Map<String,Object>) o;
final Map<String,Object> outputMap = (Map<String,Object>) o;
assertEquals("hello", outputMap.get("defaultOfHello"));
assertEquals("world", outputMap.get("noDefault"));
o = outputMap.get("intField");
assertEquals(5,o);
o = outputMap.get("intArray");
assertTrue(o instanceof Integer[]);
Integer[] intArray = (Integer[])o;
final Integer[] intArray = (Integer[])o;
assertEquals(3, intArray.length);
assertEquals((Integer)3, intArray[0]);
o = outputMap.get("objArray");
assertTrue(o instanceof Object[]);
final Object[] objArray = (Object[])o;
assertEquals(4, objArray.length);
assertEquals(3, objArray[0]);
assertEquals("2", objArray[1]);
o = outputMap.get("choiceArray");
assertTrue(o instanceof Object[]);
final Object[] choiceArray = (Object[])o;
assertEquals(2, choiceArray.length);
assertEquals("foo", choiceArray[0]);
assertTrue(choiceArray[1] instanceof Object[]);
final Object[] strArray = (Object[]) choiceArray[1];
assertEquals(2, strArray.length);
assertEquals("bar", strArray[0]);
assertEquals("baz", strArray[1]);
o = outputMap.get("complex");
assertTrue(o instanceof Map);
Map<String,Object> nestedOutputMap = (Map<String,Object>)o;
final Map<String,Object> nestedOutputMap = (Map<String,Object>)o;
o = nestedOutputMap.get("complex1");
assertTrue(o instanceof Map);
Map<String,Object> complex1 = (Map<String,Object>)o;
final Map<String,Object> complex1 = (Map<String,Object>)o;
o = complex1.get("a");
assertTrue(o instanceof Integer[]);
assertEquals((Integer)2, ((Integer[])o)[1]);
@ -299,14 +348,13 @@ public class TestDataTypeUtils {
assertEquals((Integer)3, ((Integer[])o)[2]);
o = nestedOutputMap.get("complex2");
assertTrue(o instanceof Map);
Map<String,Object> complex2 = (Map<String,Object>)o;
final Map<String,Object> complex2 = (Map<String,Object>)o;
o = complex2.get("a");
assertTrue(o instanceof String[]);
assertEquals("hello", ((String[])o)[0]);
o = complex2.get("b");
assertTrue(o instanceof String[]);
assertEquals("4", ((String[])o)[1]);
}
@Test

View File

@ -111,6 +111,8 @@
<exclude>src/test/resources/TestJoltTransformRecord/cardrOutput.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/defaultrSpec.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/defaultrOutput.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/flattenSpec.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/flattenedOutput.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/shiftrSpec.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/shiftrSpecMultipleOutputRecords.json</exclude>
<exclude>src/test/resources/TestJoltTransformRecord/sortrOutput.json</exclude>

View File

@ -18,9 +18,11 @@ package org.apache.nifi.processors.jolt.record;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.inference.SchemaInferenceUtil;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
@ -690,6 +692,34 @@ public class TestJoltTransformRecord {
runner.assertNotValid();
}
@Test
public void testJoltComplexChoiceField() throws Exception {
final JsonTreeReader reader = new JsonTreeReader();
runner.addControllerService("reader", reader);
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA);
runner.enableControllerService(reader);
runner.setProperty(JoltTransformRecord.RECORD_READER, "reader");
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
runner.setProperty(writer, "Pretty Print JSON", "true");
runner.enableControllerService(writer);
final String flattenSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/flattenSpec.json")));
runner.setProperty(JoltTransformRecord.JOLT_SPEC, flattenSpec);
runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.CHAINR);
final String inputJson = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/input.json")));
runner.enqueue(inputJson);
runner.run();
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0);
assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/flattenedOutput.json"))),
new String(transformed.toByteArray()));
}
private static Stream<Arguments> getChainrArguments() {
return Stream.of(
Arguments.of(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json"), "has no single line comments"),
@ -697,7 +727,6 @@ public class TestJoltTransformRecord {
}
private void generateTestData(int numRecords, final BiFunction<Integer, MockRecordParser, Void> recordGenerator) {
if (recordGenerator == null) {
final RecordSchema primarySchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("value", RecordFieldType.INT.getDataType())));
@ -734,8 +763,6 @@ public class TestJoltTransformRecord {
parser.addRecord(ratingRecord);
}
} else {
recordGenerator.apply(numRecords, parser);
}

View File

@ -0,0 +1,21 @@
[
{
"operation": "shift",
"spec": { "*": "record.&" }
},
{
"operation": "shift",
"spec": {
"record": {
"*": {
"$": "TValue[#2].name",
"@": "TValue[#2].value"
}
}
}
},
{
"operation": "default",
"spec": { "TValue[]": { "*": { "class": "unclass" } } }
}
]

View File

@ -0,0 +1,21 @@
[ {
"TValue" : [ {
"name" : "datetime",
"value" : "2023-10-06 20:36:09.937019+00:00",
"class" : "unclass"
}, {
"name" : "Eta",
"value" : "",
"class" : "unclass"
} ]
}, {
"TValue" : [ {
"name" : "datetime",
"value" : "2023-08-24 17:07:03.334170+00:00",
"class" : "unclass"
}, {
"name" : "Eta",
"value" : "{Day=15, Hour=6, Minute=0, Month=8}",
"class" : "unclass"
} ]
} ]

View File

@ -1,13 +1,15 @@
{
"rating": {
"primary": {
"value": 3
},
"series": {
"value": [5,4]
},
"quality": {
"value": 3
[
{
"datetime": "2023-10-06 20:36:09.937019+00:00",
"Eta": ""
},
{
"datetime": "2023-08-24 17:07:03.334170+00:00",
"Eta": {
"Day": 15,
"Hour": 6,
"Minute": 0,
"Month": 8
}
}
}
]