From aac71c5aa13d7ba6414d0301252a856068f9e77d Mon Sep 17 00:00:00 2001 From: Chris Sampson Date: Sat, 16 Sep 2023 18:34:21 +0100 Subject: [PATCH] NIFI-8135 allow CHOICE data types in conversion of Records to Java Maps Signed-off-by: Matt Burgess This closes #7746 --- .../record/util/DataTypeUtils.java | 80 +++++++++---------- .../record/TestDataTypeUtils.java | 64 +++++++++++++-- .../nifi-jolt-record-processors/pom.xml | 2 + .../jolt/record/TestJoltTransformRecord.java | 33 +++++++- .../TestJoltTransformRecord/flattenSpec.json | 21 +++++ .../flattenedOutput.json | 21 +++++ .../TestJoltTransformRecord/input.json | 24 +++--- 7 files changed, 183 insertions(+), 62 deletions(-) create mode 100644 nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/flattenSpec.json create mode 100644 nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/flattenedOutput.json diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index 7464b28ee9..1d249eebd8 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -806,6 +806,7 @@ public class DataTypeUtils { for (final Object key : original.keySet()) { if (!(key instanceof String)) { keysAreStrings = false; + break; } } @@ -854,79 +855,82 @@ public class DataTypeUtils { */ @SuppressWarnings({"unchecked", "rawtypes"}) public static Object convertRecordFieldtoObject(final Object value, final DataType dataType) { - if (value == null) { return null; } + DataType chosenDataType; + if (dataType instanceof ChoiceDataType) { + final DataType chosen = chooseDataType(value, (ChoiceDataType) dataType); + chosenDataType = chosen != null ? chosen : dataType; + } else { + chosenDataType = dataType; + } + if (value instanceof Record) { - Record record = (Record) value; - RecordSchema recordSchema = record.getSchema(); + final Record record = (Record) value; + final RecordSchema recordSchema = record.getSchema(); if (recordSchema == null) { throw new IllegalTypeConversionException("Cannot convert value of type Record to Map because Record does not have an associated Schema"); } - final Map recordMap = new LinkedHashMap<>(); - for (RecordField field : recordSchema.getFields()) { - final DataType fieldDataType = field.getDataType(); + final Map recordMap = new LinkedHashMap<>(record.getRawFieldNames().size(), 1); + for (final RecordField field : recordSchema.getFields()) { final String fieldName = field.getFieldName(); - Object fieldValue = record.getValue(fieldName); + final Object fieldValue = record.getValue(fieldName); + if (field.getDataType() instanceof ChoiceDataType) { + final DataType chosen = chooseDataType(fieldValue, (ChoiceDataType) field.getDataType()); + chosenDataType = chosen != null ? chosen : field.getDataType(); + } else { + chosenDataType = field.getDataType(); + } if (fieldValue == null) { recordMap.put(fieldName, null); - } else if (isScalarValue(fieldDataType, fieldValue)) { + } else if (isScalarValue(chosenDataType, fieldValue)) { recordMap.put(fieldName, fieldValue); - } else if (fieldDataType instanceof RecordDataType) { + } else if (chosenDataType instanceof RecordDataType) { Record nestedRecord = (Record) fieldValue; - recordMap.put(fieldName, convertRecordFieldtoObject(nestedRecord, fieldDataType)); - } else if (fieldDataType instanceof MapDataType) { - recordMap.put(fieldName, convertRecordMapToJavaMap((Map) fieldValue, ((MapDataType)fieldDataType).getValueType())); - - } else if (fieldDataType instanceof ArrayDataType) { - recordMap.put(fieldName, convertRecordArrayToJavaArray((Object[])fieldValue, ((ArrayDataType) fieldDataType).getElementType())); + recordMap.put(fieldName, convertRecordFieldtoObject(nestedRecord, chosenDataType)); + } else if (chosenDataType instanceof MapDataType) { + recordMap.put(fieldName, convertRecordMapToJavaMap((Map) fieldValue, ((MapDataType) chosenDataType).getValueType())); + } else if (chosenDataType instanceof ArrayDataType) { + recordMap.put(fieldName, convertRecordArrayToJavaArray((Object[]) fieldValue, ((ArrayDataType) chosenDataType).getElementType())); } else { - throw new IllegalTypeConversionException("Cannot convert value [" + fieldValue + "] of type " + fieldDataType.toString() + throw new IllegalTypeConversionException("Cannot convert value [" + fieldValue + "] of type " + chosenDataType + " to Map for field " + fieldName + " because the type is not supported"); } } return recordMap; } else if (value instanceof Map) { - return convertRecordMapToJavaMap((Map) value, ((MapDataType) dataType).getValueType()); - } else if (dataType != null && isScalarValue(dataType, value)) { + return convertRecordMapToJavaMap((Map) value, ((MapDataType) chosenDataType).getValueType()); + } else if (chosenDataType != null && isScalarValue(chosenDataType, value)) { return value; - } else if (value instanceof Object[] && dataType instanceof ArrayDataType) { + } else if (value instanceof Object[] && chosenDataType instanceof ArrayDataType) { // This is likely a Map whose values are represented as an array. Return a new array with each element converted to a Java object - return convertRecordArrayToJavaArray((Object[]) value, ((ArrayDataType) dataType).getElementType()); + return convertRecordArrayToJavaArray((Object[]) value, ((ArrayDataType) chosenDataType).getElementType()); } throw new IllegalTypeConversionException("Cannot convert value of class " + value.getClass().getName() + " because the type is not supported"); } - - public static Map convertRecordMapToJavaMap(final Map map, DataType valueDataType) { - + public static Map convertRecordMapToJavaMap(final Map map, final DataType valueDataType) { if (map == null) { return null; } - Map resultMap = new LinkedHashMap<>(); - for (Map.Entry entry : map.entrySet()) { + final Map resultMap = new LinkedHashMap<>(); + for (final Map.Entry entry : map.entrySet()) { resultMap.put(entry.getKey(), convertRecordFieldtoObject(entry.getValue(), valueDataType)); } return resultMap; } - public static Object[] convertRecordArrayToJavaArray(final Object[] array, DataType elementDataType) { - - if (array == null || array.length == 0 || isScalarValue(elementDataType, array[0])) { + public static Object[] convertRecordArrayToJavaArray(final Object[] array, final DataType elementDataType) { + if (array == null || array.length == 0 || Arrays.stream(array).allMatch(o -> isScalarValue(elementDataType, o))) { return array; } else { - // Must be an array of complex types, build an array of converted values - Object[] resultArray = new Object[array.length]; - for (int i = 0; i < array.length; i++) { - resultArray[i] = convertRecordFieldtoObject(array[i], elementDataType); - } - return resultArray; + return Arrays.stream(array).map(o -> convertRecordFieldtoObject(o, elementDataType)).toArray(); } } @@ -1089,7 +1093,7 @@ public class DataTypeUtils { if(dataType.getEnums() != null && dataType.getEnums().contains(value)) { return value.toString(); } - throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + dataType.toString() + " for field " + fieldName); + throw new IllegalTypeConversionException("Cannot convert value " + value + " of type " + dataType + " for field " + fieldName); } public static java.sql.Date toDate(final Object value, final Supplier format, final String fieldName) { @@ -1933,11 +1937,7 @@ public class DataTypeUtils { return true; } - if (!Objects.equals(thisField.getDefaultValue(), otherField.getDefaultValue())) { - return true; - } - - return false; + return !Objects.equals(thisField.getDefaultValue(), otherField.getDefaultValue()); } public static RecordField merge(final RecordField thisField, final RecordField otherField) { diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java index 16a1a64128..7b11fd38bc 100644 --- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java +++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestDataTypeUtils.java @@ -232,6 +232,30 @@ public class TestDataTypeUtils { } } + @Test + void testConvertRecordFieldToObjectWithNestedRecord() { + final Record record = DataTypeUtils.toRecord(new LinkedHashMap(){{ + put("firstName", "John"); + put("age", 30); + put("addresses", new Object[] {"some string", DataTypeUtils.toRecord(Collections.singletonMap("address_1", "123 Fake Street"), "addresses")}); + }}, ""); + + final Object obj = DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getDataType()); + assertTrue(obj instanceof Map); + final Map map = (Map) obj; + assertEquals("John", map.get("firstName")); + assertEquals(30, map.get("age")); + + assertTrue(map.get("addresses") instanceof Object[]); + final Object[] objArray = (Object[]) map.get("addresses"); + assertEquals(2, objArray.length); + assertEquals("some string", objArray[0]); + + assertTrue(objArray[1] instanceof Map); + final Map addressMap = (Map) objArray[1]; + assertEquals("123 Fake Street", addressMap.get("address_1")); + } + @Test @SuppressWarnings("unchecked") public void testConvertRecordFieldToObject() { @@ -243,12 +267,18 @@ public class TestDataTypeUtils { fields.add(new RecordField("noDefault", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType()))); fields.add(new RecordField("intField", RecordFieldType.INT.getDataType())); fields.add(new RecordField("intArray", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()))); + fields.add(new RecordField("objArray", RecordFieldType.ARRAY.getArrayDataType( + RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType()) + ))); + fields.add(new RecordField("choiceArray", RecordFieldType.ARRAY.getArrayDataType( + RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())) + ))); // Map of Records with Arrays - List nestedRecordFields = new ArrayList<>(); + final List nestedRecordFields = new ArrayList<>(); nestedRecordFields.add(new RecordField("a", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType()))); nestedRecordFields.add(new RecordField("b", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()))); - RecordSchema nestedRecordSchema = new SimpleRecordSchema(nestedRecordFields); + final RecordSchema nestedRecordSchema = new SimpleRecordSchema(nestedRecordFields); fields.add(new RecordField("complex", RecordFieldType.MAP.getMapDataType(RecordFieldType.RECORD.getRecordDataType(nestedRecordSchema)))); @@ -257,6 +287,9 @@ public class TestDataTypeUtils { values.put("noDefault", "world"); values.put("intField", 5); values.put("intArray", new Integer[] {3,2,1}); + values.put("objArray", new Object[] {3,"2","abc",1}); + values.put("noChoiceArray", new Object[] {"foo","BAR"}); + values.put("choiceArray", new Object[] {"foo",new Object[]{"bar","baz"}}); final Map complexValues = new HashMap<>(); final Map complexValueRecord1 = new HashMap<>(); @@ -275,22 +308,38 @@ public class TestDataTypeUtils { Object o = DataTypeUtils.convertRecordFieldtoObject(inputRecord, RecordFieldType.RECORD.getRecordDataType(schema)); assertTrue(o instanceof Map); - Map outputMap = (Map) o; + final Map outputMap = (Map) o; assertEquals("hello", outputMap.get("defaultOfHello")); assertEquals("world", outputMap.get("noDefault")); o = outputMap.get("intField"); assertEquals(5,o); o = outputMap.get("intArray"); assertTrue(o instanceof Integer[]); - Integer[] intArray = (Integer[])o; + final Integer[] intArray = (Integer[])o; assertEquals(3, intArray.length); assertEquals((Integer)3, intArray[0]); + o = outputMap.get("objArray"); + assertTrue(o instanceof Object[]); + final Object[] objArray = (Object[])o; + assertEquals(4, objArray.length); + assertEquals(3, objArray[0]); + assertEquals("2", objArray[1]); + o = outputMap.get("choiceArray"); + assertTrue(o instanceof Object[]); + final Object[] choiceArray = (Object[])o; + assertEquals(2, choiceArray.length); + assertEquals("foo", choiceArray[0]); + assertTrue(choiceArray[1] instanceof Object[]); + final Object[] strArray = (Object[]) choiceArray[1]; + assertEquals(2, strArray.length); + assertEquals("bar", strArray[0]); + assertEquals("baz", strArray[1]); o = outputMap.get("complex"); assertTrue(o instanceof Map); - Map nestedOutputMap = (Map)o; + final Map nestedOutputMap = (Map)o; o = nestedOutputMap.get("complex1"); assertTrue(o instanceof Map); - Map complex1 = (Map)o; + final Map complex1 = (Map)o; o = complex1.get("a"); assertTrue(o instanceof Integer[]); assertEquals((Integer)2, ((Integer[])o)[1]); @@ -299,14 +348,13 @@ public class TestDataTypeUtils { assertEquals((Integer)3, ((Integer[])o)[2]); o = nestedOutputMap.get("complex2"); assertTrue(o instanceof Map); - Map complex2 = (Map)o; + final Map complex2 = (Map)o; o = complex2.get("a"); assertTrue(o instanceof String[]); assertEquals("hello", ((String[])o)[0]); o = complex2.get("b"); assertTrue(o instanceof String[]); assertEquals("4", ((String[])o)[1]); - } @Test diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml index c4f955a45f..cf8f3065ee 100644 --- a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml @@ -111,6 +111,8 @@ src/test/resources/TestJoltTransformRecord/cardrOutput.json src/test/resources/TestJoltTransformRecord/defaultrSpec.json src/test/resources/TestJoltTransformRecord/defaultrOutput.json + src/test/resources/TestJoltTransformRecord/flattenSpec.json + src/test/resources/TestJoltTransformRecord/flattenedOutput.json src/test/resources/TestJoltTransformRecord/shiftrSpec.json src/test/resources/TestJoltTransformRecord/shiftrSpecMultipleOutputRecords.json src/test/resources/TestJoltTransformRecord/sortrOutput.json diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java index f887fc3f69..212cc0cf9f 100644 --- a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java @@ -18,9 +18,11 @@ package org.apache.nifi.processors.jolt.record; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.processor.Relationship; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schema.inference.SchemaInferenceUtil; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MockRecordParser; @@ -690,6 +692,34 @@ public class TestJoltTransformRecord { runner.assertNotValid(); } + @Test + public void testJoltComplexChoiceField() throws Exception { + final JsonTreeReader reader = new JsonTreeReader(); + runner.addControllerService("reader", reader); + runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA); + runner.enableControllerService(reader); + runner.setProperty(JoltTransformRecord.RECORD_READER, "reader"); + + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + + final String flattenSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/flattenSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, flattenSpec); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.CHAINR); + + final String inputJson = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/input.json"))); + runner.enqueue(inputJson); + + runner.run(); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); + runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); + assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/flattenedOutput.json"))), + new String(transformed.toByteArray())); + } + private static Stream getChainrArguments() { return Stream.of( Arguments.of(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json"), "has no single line comments"), @@ -697,7 +727,6 @@ public class TestJoltTransformRecord { } private void generateTestData(int numRecords, final BiFunction recordGenerator) { - if (recordGenerator == null) { final RecordSchema primarySchema = new SimpleRecordSchema(Arrays.asList( new RecordField("value", RecordFieldType.INT.getDataType()))); @@ -734,8 +763,6 @@ public class TestJoltTransformRecord { parser.addRecord(ratingRecord); } - - } else { recordGenerator.apply(numRecords, parser); } diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/flattenSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/flattenSpec.json new file mode 100644 index 0000000000..64076a65ca --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/flattenSpec.json @@ -0,0 +1,21 @@ +[ + { + "operation": "shift", + "spec": { "*": "record.&" } + }, + { + "operation": "shift", + "spec": { + "record": { + "*": { + "$": "TValue[#2].name", + "@": "TValue[#2].value" + } + } + } + }, + { + "operation": "default", + "spec": { "TValue[]": { "*": { "class": "unclass" } } } + } +] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/flattenedOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/flattenedOutput.json new file mode 100644 index 0000000000..eec23144df --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/flattenedOutput.json @@ -0,0 +1,21 @@ +[ { + "TValue" : [ { + "name" : "datetime", + "value" : "2023-10-06 20:36:09.937019+00:00", + "class" : "unclass" + }, { + "name" : "Eta", + "value" : "", + "class" : "unclass" + } ] +}, { + "TValue" : [ { + "name" : "datetime", + "value" : "2023-08-24 17:07:03.334170+00:00", + "class" : "unclass" + }, { + "name" : "Eta", + "value" : "{Day=15, Hour=6, Minute=0, Month=8}", + "class" : "unclass" + } ] +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/input.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/input.json index 12d85dbe98..6c64a8539b 100644 --- a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/input.json +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/input.json @@ -1,13 +1,15 @@ -{ - "rating": { - "primary": { - "value": 3 - }, - "series": { - "value": [5,4] - }, - "quality": { - "value": 3 +[ + { + "datetime": "2023-10-06 20:36:09.937019+00:00", + "Eta": "" + }, + { + "datetime": "2023-08-24 17:07:03.334170+00:00", + "Eta": { + "Day": 15, + "Hour": 6, + "Minute": 0, + "Month": 8 } } -} \ No newline at end of file +] \ No newline at end of file