diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java index 49ff37f475..42efadc07d 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java @@ -191,20 +191,35 @@ public class IcebergRecordConverter { return GenericDataConverters.convertSchemaTypeToDataType(schemaFieldType); } final Optional recordField = recordType.getChildSchema().getField(mappedFieldName.get()); - final RecordField field = recordField.get(); + final DataType fieldType = recordField.get().getDataType(); // If the actual record contains a nested record then we need to create a RecordTypeWithFieldNameMapper wrapper object for it. - if (field.getDataType() instanceof RecordDataType) { - return new RecordTypeWithFieldNameMapper(new Schema(schema.findField(fieldId).type().asStructType().fields()), (RecordDataType) field.getDataType()); + if (fieldType instanceof RecordDataType) { + return new RecordTypeWithFieldNameMapper(new Schema(schema.findField(fieldId).type().asStructType().fields()), (RecordDataType) fieldType); + } + + // If the field is an Array, and it contains Records then add the record's iceberg schema for creating RecordTypeWithFieldNameMapper + if (fieldType instanceof ArrayDataType && ((ArrayDataType) fieldType).getElementType() instanceof RecordDataType) { + return new ArrayTypeWithIcebergSchema( + new Schema(schema.findField(fieldId).type().asListType().elementType().asStructType().fields()), + ((ArrayDataType) fieldType).getElementType() + ); + } + + // If the field is a Map, and it's value field contains Records then add the record's iceberg schema for creating RecordTypeWithFieldNameMapper + if (fieldType instanceof MapDataType && ((MapDataType) fieldType).getValueType() instanceof RecordDataType) { + return new MapTypeWithIcebergSchema( + new Schema(schema.findField(fieldId).type().asMapType().valueType().asStructType().fields()), + ((MapDataType) fieldType).getValueType() + ); } // If the source field or target field is of type UUID, create a UUIDDataType from it - if (field.getDataType().getFieldType().equals(RecordFieldType.UUID) - || schema.findField(fieldId).type().typeId() == Type.TypeID.UUID) { - return new UUIDDataType(field.getDataType(), fileFormat); + if (fieldType.getFieldType().equals(RecordFieldType.UUID) || schema.findField(fieldId).type().typeId() == Type.TypeID.UUID) { + return new UUIDDataType(fieldType, fileFormat); } - return field.getDataType(); + return fieldType; } @Override @@ -216,6 +231,10 @@ public class IcebergRecordConverter { public DataType mapValuePartner(DataType dataType) { Validate.isTrue(dataType instanceof MapDataType, String.format("Invalid map: %s is not a map", dataType)); final MapDataType mapType = (MapDataType) dataType; + if (mapType instanceof MapTypeWithIcebergSchema) { + MapTypeWithIcebergSchema typeWithSchema = (MapTypeWithIcebergSchema) mapType; + return new RecordTypeWithFieldNameMapper(typeWithSchema.getValueSchema(), (RecordDataType) typeWithSchema.getValueType()); + } return mapType.getValueType(); } @@ -223,6 +242,10 @@ public class IcebergRecordConverter { public DataType listElementPartner(DataType dataType) { Validate.isTrue(dataType instanceof ArrayDataType, String.format("Invalid array: %s is not an array", dataType)); final ArrayDataType arrayType = (ArrayDataType) dataType; + if (arrayType instanceof ArrayTypeWithIcebergSchema) { + ArrayTypeWithIcebergSchema typeWithSchema = (ArrayTypeWithIcebergSchema) arrayType; + return new RecordTypeWithFieldNameMapper(typeWithSchema.getElementSchema(), (RecordDataType) typeWithSchema.getElementType()); + } return arrayType.getElementType(); } } @@ -268,4 +291,38 @@ public class IcebergRecordConverter { } } + /** + * Data type for Arrays which contains Records. The class stores the iceberg schema for the element type. + */ + private static class ArrayTypeWithIcebergSchema extends ArrayDataType { + + private final Schema elementSchema; + + public ArrayTypeWithIcebergSchema(Schema elementSchema, DataType elementType) { + super(elementType); + this.elementSchema = elementSchema; + } + + public Schema getElementSchema() { + return elementSchema; + } + } + + /** + * Data type for Maps which contains Records in the entries value. The class stores the iceberg schema for the value type. + */ + private static class MapTypeWithIcebergSchema extends MapDataType { + + private final Schema valueSchema; + + public MapTypeWithIcebergSchema(Schema valueSchema, DataType valueType) { + super(valueType); + this.valueSchema = valueSchema; + } + + public Schema getValueSchema() { + return valueSchema; + } + } + } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java index b33090f042..abfc65c566 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java @@ -72,7 +72,9 @@ import java.time.LocalTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -134,6 +136,22 @@ public class TestIcebergRecordConverter { )) ); + private static final Schema RECORD_IN_LIST_SCHEMA = new Schema( + Types.NestedField.required(0, "list", Types.ListType.ofRequired( + 1, Types.StructType.of( + Types.NestedField.required(2, "string", Types.StringType.get()), + Types.NestedField.required(3, "integer", Types.IntegerType.get()))) + ) + ); + + private static final Schema RECORD_IN_MAP_SCHEMA = new Schema( + Types.NestedField.required(0, "map", Types.MapType.ofRequired( + 1, 2, Types.StringType.get(), Types.StructType.of( + Types.NestedField.required(3, "string", Types.StringType.get()), + Types.NestedField.required(4, "integer", Types.IntegerType.get()))) + ) + ); + private static final Schema PRIMITIVES_SCHEMA = new Schema( Types.NestedField.optional(0, "string", Types.StringType.get()), Types.NestedField.optional(1, "integer", Types.IntegerType.get()), @@ -244,6 +262,22 @@ public class TestIcebergRecordConverter { return new SimpleRecordSchema(fields); } + private static RecordSchema getRecordInListSchema() { + List fields = new ArrayList<>(); + fields.add(new RecordField("list", new ArrayDataType( + new RecordDataType(getNestedStructSchema2())))); + + return new SimpleRecordSchema(fields); + } + + private static RecordSchema getRecordInMapSchema() { + List fields = new ArrayList<>(); + fields.add(new RecordField("map", new MapDataType( + new RecordDataType(getNestedStructSchema2())))); + + return new SimpleRecordSchema(fields); + } + private static RecordSchema getPrimitivesSchema() { List fields = new ArrayList<>(); fields.add(new RecordField("string", RecordFieldType.STRING.getDataType())); @@ -387,6 +421,34 @@ public class TestIcebergRecordConverter { return new MapRecord(getMapSchema(), values); } + private static Record setupRecordInListTestRecord() { + Map struct1 = new HashMap<>(); + struct1.put("string", "Test String 1"); + struct1.put("integer", 10); + + Map struct2 = new HashMap<>(); + struct2.put("string", "Test String 2"); + struct2.put("integer", 20); + + return new MapRecord(getRecordInListSchema(), Collections.singletonMap("list", Arrays.asList(struct1, struct2))); + } + + private static Record setupRecordInMapTestRecord() { + Map struct1 = new HashMap<>(); + struct1.put("string", "Test String 1"); + struct1.put("integer", 10); + + Map struct2 = new HashMap<>(); + struct2.put("string", "Test String 2"); + struct2.put("integer", 20); + + Map> map = new HashMap<>(); + map.put("key1", struct1); + map.put("key2", struct2); + + return new MapRecord(getMapSchema(), Collections.singletonMap("map", map)); + } + private static Record setupPrimitivesTestRecord() { LocalDate localDate = LocalDate.of(2017, 4, 4); LocalTime localTime = LocalTime.of(14, 20, 33); @@ -802,6 +864,78 @@ public class TestIcebergRecordConverter { assertEquals(42L, baseMap.get("nested_key")); } + @DisabledOnOs(WINDOWS) + @Test + public void testRecordInList() throws IOException { + RecordSchema nifiSchema = getRecordInListSchema(); + Record record = setupRecordInListTestRecord(); + final FileFormat format = FileFormat.AVRO; + + IcebergRecordConverter recordConverter = new IcebergRecordConverter(RECORD_IN_LIST_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger); + GenericRecord genericRecord = recordConverter.convert(record); + + writeTo(format, RECORD_IN_LIST_SCHEMA, genericRecord, tempFile); + + List results = readFrom(format, RECORD_IN_LIST_SCHEMA, tempFile.toInputFile()); + + assertEquals(1, results.size()); + assertInstanceOf(GenericRecord.class, results.get(0)); + GenericRecord resultRecord = results.get(0); + + assertEquals(1, resultRecord.size()); + assertInstanceOf(List.class, resultRecord.get(0)); + List fieldList = resultRecord.get(0, List.class); + + assertEquals(2, fieldList.size()); + assertInstanceOf(GenericRecord.class, fieldList.get(0)); + assertInstanceOf(GenericRecord.class, fieldList.get(1)); + + GenericRecord record1 = (GenericRecord) fieldList.get(0); + GenericRecord record2 = (GenericRecord) fieldList.get(1); + + assertEquals("Test String 1", record1.get(0, String.class)); + assertEquals(Integer.valueOf(10), record1.get(1, Integer.class)); + + assertEquals("Test String 2", record2.get(0, String.class)); + assertEquals(Integer.valueOf(20), record2.get(1, Integer.class)); + } + + @DisabledOnOs(WINDOWS) + @Test + public void testRecordInMap() throws IOException { + RecordSchema nifiSchema = getRecordInMapSchema(); + Record record = setupRecordInMapTestRecord(); + final FileFormat format = FileFormat.ORC; + + IcebergRecordConverter recordConverter = new IcebergRecordConverter(RECORD_IN_MAP_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger); + GenericRecord genericRecord = recordConverter.convert(record); + + writeTo(format, RECORD_IN_MAP_SCHEMA, genericRecord, tempFile); + + List results = readFrom(format, RECORD_IN_MAP_SCHEMA, tempFile.toInputFile()); + + assertEquals(1, results.size()); + assertInstanceOf(GenericRecord.class, results.get(0)); + GenericRecord resultRecord = results.get(0); + + assertEquals(1, resultRecord.size()); + assertInstanceOf(Map.class, resultRecord.get(0)); + Map recordMap = resultRecord.get(0, Map.class); + + assertEquals(2, recordMap.size()); + assertInstanceOf(GenericRecord.class, recordMap.get("key1")); + assertInstanceOf(GenericRecord.class, recordMap.get("key2")); + + GenericRecord record1 = (GenericRecord) recordMap.get("key1"); + GenericRecord record2 = (GenericRecord) recordMap.get("key2"); + + assertEquals("Test String 1", record1.get(0, String.class)); + assertEquals(Integer.valueOf(10), record1.get(1, Integer.class)); + + assertEquals("Test String 2", record2.get(0, String.class)); + assertEquals(Integer.valueOf(20), record2.get(1, Integer.class)); + } + @DisabledOnOs(WINDOWS) @ParameterizedTest @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})