mirror of
https://github.com/apache/nifi.git
synced 2025-02-22 10:29:24 +00:00
NIFI-12596: PutIceberg is missing case-insensitive Record type handling in List and Map types
Signed-off-by: Matt Burgess <mattyb149@apache.org>
This commit is contained in:
parent
26c21bbdcc
commit
99ffcdb3bb
@ -191,20 +191,35 @@ public class IcebergRecordConverter {
|
||||
return GenericDataConverters.convertSchemaTypeToDataType(schemaFieldType);
|
||||
}
|
||||
final Optional<RecordField> recordField = recordType.getChildSchema().getField(mappedFieldName.get());
|
||||
final RecordField field = recordField.get();
|
||||
final DataType fieldType = recordField.get().getDataType();
|
||||
|
||||
// If the actual record contains a nested record then we need to create a RecordTypeWithFieldNameMapper wrapper object for it.
|
||||
if (field.getDataType() instanceof RecordDataType) {
|
||||
return new RecordTypeWithFieldNameMapper(new Schema(schema.findField(fieldId).type().asStructType().fields()), (RecordDataType) field.getDataType());
|
||||
if (fieldType instanceof RecordDataType) {
|
||||
return new RecordTypeWithFieldNameMapper(new Schema(schema.findField(fieldId).type().asStructType().fields()), (RecordDataType) fieldType);
|
||||
}
|
||||
|
||||
// If the field is an Array, and it contains Records then add the record's iceberg schema for creating RecordTypeWithFieldNameMapper
|
||||
if (fieldType instanceof ArrayDataType && ((ArrayDataType) fieldType).getElementType() instanceof RecordDataType) {
|
||||
return new ArrayTypeWithIcebergSchema(
|
||||
new Schema(schema.findField(fieldId).type().asListType().elementType().asStructType().fields()),
|
||||
((ArrayDataType) fieldType).getElementType()
|
||||
);
|
||||
}
|
||||
|
||||
// If the field is a Map, and it's value field contains Records then add the record's iceberg schema for creating RecordTypeWithFieldNameMapper
|
||||
if (fieldType instanceof MapDataType && ((MapDataType) fieldType).getValueType() instanceof RecordDataType) {
|
||||
return new MapTypeWithIcebergSchema(
|
||||
new Schema(schema.findField(fieldId).type().asMapType().valueType().asStructType().fields()),
|
||||
((MapDataType) fieldType).getValueType()
|
||||
);
|
||||
}
|
||||
|
||||
// If the source field or target field is of type UUID, create a UUIDDataType from it
|
||||
if (field.getDataType().getFieldType().equals(RecordFieldType.UUID)
|
||||
|| schema.findField(fieldId).type().typeId() == Type.TypeID.UUID) {
|
||||
return new UUIDDataType(field.getDataType(), fileFormat);
|
||||
if (fieldType.getFieldType().equals(RecordFieldType.UUID) || schema.findField(fieldId).type().typeId() == Type.TypeID.UUID) {
|
||||
return new UUIDDataType(fieldType, fileFormat);
|
||||
}
|
||||
|
||||
return field.getDataType();
|
||||
return fieldType;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -216,6 +231,10 @@ public class IcebergRecordConverter {
|
||||
public DataType mapValuePartner(DataType dataType) {
|
||||
Validate.isTrue(dataType instanceof MapDataType, String.format("Invalid map: %s is not a map", dataType));
|
||||
final MapDataType mapType = (MapDataType) dataType;
|
||||
if (mapType instanceof MapTypeWithIcebergSchema) {
|
||||
MapTypeWithIcebergSchema typeWithSchema = (MapTypeWithIcebergSchema) mapType;
|
||||
return new RecordTypeWithFieldNameMapper(typeWithSchema.getValueSchema(), (RecordDataType) typeWithSchema.getValueType());
|
||||
}
|
||||
return mapType.getValueType();
|
||||
}
|
||||
|
||||
@ -223,6 +242,10 @@ public class IcebergRecordConverter {
|
||||
public DataType listElementPartner(DataType dataType) {
|
||||
Validate.isTrue(dataType instanceof ArrayDataType, String.format("Invalid array: %s is not an array", dataType));
|
||||
final ArrayDataType arrayType = (ArrayDataType) dataType;
|
||||
if (arrayType instanceof ArrayTypeWithIcebergSchema) {
|
||||
ArrayTypeWithIcebergSchema typeWithSchema = (ArrayTypeWithIcebergSchema) arrayType;
|
||||
return new RecordTypeWithFieldNameMapper(typeWithSchema.getElementSchema(), (RecordDataType) typeWithSchema.getElementType());
|
||||
}
|
||||
return arrayType.getElementType();
|
||||
}
|
||||
}
|
||||
@ -268,4 +291,38 @@ public class IcebergRecordConverter {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Data type for Arrays which contains Records. The class stores the iceberg schema for the element type.
|
||||
*/
|
||||
private static class ArrayTypeWithIcebergSchema extends ArrayDataType {
|
||||
|
||||
private final Schema elementSchema;
|
||||
|
||||
public ArrayTypeWithIcebergSchema(Schema elementSchema, DataType elementType) {
|
||||
super(elementType);
|
||||
this.elementSchema = elementSchema;
|
||||
}
|
||||
|
||||
public Schema getElementSchema() {
|
||||
return elementSchema;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Data type for Maps which contains Records in the entries value. The class stores the iceberg schema for the value type.
|
||||
*/
|
||||
private static class MapTypeWithIcebergSchema extends MapDataType {
|
||||
|
||||
private final Schema valueSchema;
|
||||
|
||||
public MapTypeWithIcebergSchema(Schema valueSchema, DataType valueType) {
|
||||
super(valueType);
|
||||
this.valueSchema = valueSchema;
|
||||
}
|
||||
|
||||
public Schema getValueSchema() {
|
||||
return valueSchema;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -73,7 +73,9 @@ import java.time.OffsetDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -133,6 +135,22 @@ public class TestIcebergRecordConverter {
|
||||
))
|
||||
);
|
||||
|
||||
private static final Schema RECORD_IN_LIST_SCHEMA = new Schema(
|
||||
Types.NestedField.required(0, "list", Types.ListType.ofRequired(
|
||||
1, Types.StructType.of(
|
||||
Types.NestedField.required(2, "string", Types.StringType.get()),
|
||||
Types.NestedField.required(3, "integer", Types.IntegerType.get())))
|
||||
)
|
||||
);
|
||||
|
||||
private static final Schema RECORD_IN_MAP_SCHEMA = new Schema(
|
||||
Types.NestedField.required(0, "map", Types.MapType.ofRequired(
|
||||
1, 2, Types.StringType.get(), Types.StructType.of(
|
||||
Types.NestedField.required(3, "string", Types.StringType.get()),
|
||||
Types.NestedField.required(4, "integer", Types.IntegerType.get())))
|
||||
)
|
||||
);
|
||||
|
||||
private static final Schema PRIMITIVES_SCHEMA = new Schema(
|
||||
Types.NestedField.optional(0, "string", Types.StringType.get()),
|
||||
Types.NestedField.optional(1, "integer", Types.IntegerType.get()),
|
||||
@ -243,6 +261,22 @@ public class TestIcebergRecordConverter {
|
||||
return new SimpleRecordSchema(fields);
|
||||
}
|
||||
|
||||
private static RecordSchema getRecordInListSchema() {
|
||||
List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField("list", new ArrayDataType(
|
||||
new RecordDataType(getNestedStructSchema2()))));
|
||||
|
||||
return new SimpleRecordSchema(fields);
|
||||
}
|
||||
|
||||
private static RecordSchema getRecordInMapSchema() {
|
||||
List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField("map", new MapDataType(
|
||||
new RecordDataType(getNestedStructSchema2()))));
|
||||
|
||||
return new SimpleRecordSchema(fields);
|
||||
}
|
||||
|
||||
private static RecordSchema getPrimitivesSchema() {
|
||||
List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField("string", RecordFieldType.STRING.getDataType()));
|
||||
@ -386,6 +420,34 @@ public class TestIcebergRecordConverter {
|
||||
return new MapRecord(getMapSchema(), values);
|
||||
}
|
||||
|
||||
private static Record setupRecordInListTestRecord() {
|
||||
Map<String, Object> struct1 = new HashMap<>();
|
||||
struct1.put("string", "Test String 1");
|
||||
struct1.put("integer", 10);
|
||||
|
||||
Map<String, Object> struct2 = new HashMap<>();
|
||||
struct2.put("string", "Test String 2");
|
||||
struct2.put("integer", 20);
|
||||
|
||||
return new MapRecord(getRecordInListSchema(), Collections.singletonMap("list", Arrays.asList(struct1, struct2)));
|
||||
}
|
||||
|
||||
private static Record setupRecordInMapTestRecord() {
|
||||
Map<String, Object> struct1 = new HashMap<>();
|
||||
struct1.put("string", "Test String 1");
|
||||
struct1.put("integer", 10);
|
||||
|
||||
Map<String, Object> struct2 = new HashMap<>();
|
||||
struct2.put("string", "Test String 2");
|
||||
struct2.put("integer", 20);
|
||||
|
||||
Map<String, Map<String, Object>> map = new HashMap<>();
|
||||
map.put("key1", struct1);
|
||||
map.put("key2", struct2);
|
||||
|
||||
return new MapRecord(getMapSchema(), Collections.singletonMap("map", map));
|
||||
}
|
||||
|
||||
private static Record setupPrimitivesTestRecord() {
|
||||
LocalDate localDate = LocalDate.of(2017, 4, 4);
|
||||
LocalTime localTime = LocalTime.of(14, 20, 33);
|
||||
@ -811,6 +873,78 @@ public class TestIcebergRecordConverter {
|
||||
assertEquals(42L, baseMap.get("nested_key"));
|
||||
}
|
||||
|
||||
@DisabledOnOs(WINDOWS)
|
||||
@Test
|
||||
public void testRecordInList() throws IOException {
|
||||
RecordSchema nifiSchema = getRecordInListSchema();
|
||||
Record record = setupRecordInListTestRecord();
|
||||
final FileFormat format = FileFormat.AVRO;
|
||||
|
||||
IcebergRecordConverter recordConverter = new IcebergRecordConverter(RECORD_IN_LIST_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
|
||||
GenericRecord genericRecord = recordConverter.convert(record);
|
||||
|
||||
writeTo(format, RECORD_IN_LIST_SCHEMA, genericRecord, tempFile);
|
||||
|
||||
List<GenericRecord> results = readFrom(format, RECORD_IN_LIST_SCHEMA, tempFile.toInputFile());
|
||||
|
||||
assertEquals(1, results.size());
|
||||
assertInstanceOf(GenericRecord.class, results.get(0));
|
||||
GenericRecord resultRecord = results.get(0);
|
||||
|
||||
assertEquals(1, resultRecord.size());
|
||||
assertInstanceOf(List.class, resultRecord.get(0));
|
||||
List<?> fieldList = resultRecord.get(0, List.class);
|
||||
|
||||
assertEquals(2, fieldList.size());
|
||||
assertInstanceOf(GenericRecord.class, fieldList.get(0));
|
||||
assertInstanceOf(GenericRecord.class, fieldList.get(1));
|
||||
|
||||
GenericRecord record1 = (GenericRecord) fieldList.get(0);
|
||||
GenericRecord record2 = (GenericRecord) fieldList.get(1);
|
||||
|
||||
assertEquals("Test String 1", record1.get(0, String.class));
|
||||
assertEquals(Integer.valueOf(10), record1.get(1, Integer.class));
|
||||
|
||||
assertEquals("Test String 2", record2.get(0, String.class));
|
||||
assertEquals(Integer.valueOf(20), record2.get(1, Integer.class));
|
||||
}
|
||||
|
||||
@DisabledOnOs(WINDOWS)
|
||||
@Test
|
||||
public void testRecordInMap() throws IOException {
|
||||
RecordSchema nifiSchema = getRecordInMapSchema();
|
||||
Record record = setupRecordInMapTestRecord();
|
||||
final FileFormat format = FileFormat.ORC;
|
||||
|
||||
IcebergRecordConverter recordConverter = new IcebergRecordConverter(RECORD_IN_MAP_SCHEMA, nifiSchema, format, UnmatchedColumnBehavior.IGNORE_UNMATCHED_COLUMN, logger);
|
||||
GenericRecord genericRecord = recordConverter.convert(record);
|
||||
|
||||
writeTo(format, RECORD_IN_MAP_SCHEMA, genericRecord, tempFile);
|
||||
|
||||
List<GenericRecord> results = readFrom(format, RECORD_IN_MAP_SCHEMA, tempFile.toInputFile());
|
||||
|
||||
assertEquals(1, results.size());
|
||||
assertInstanceOf(GenericRecord.class, results.get(0));
|
||||
GenericRecord resultRecord = results.get(0);
|
||||
|
||||
assertEquals(1, resultRecord.size());
|
||||
assertInstanceOf(Map.class, resultRecord.get(0));
|
||||
Map recordMap = resultRecord.get(0, Map.class);
|
||||
|
||||
assertEquals(2, recordMap.size());
|
||||
assertInstanceOf(GenericRecord.class, recordMap.get("key1"));
|
||||
assertInstanceOf(GenericRecord.class, recordMap.get("key2"));
|
||||
|
||||
GenericRecord record1 = (GenericRecord) recordMap.get("key1");
|
||||
GenericRecord record2 = (GenericRecord) recordMap.get("key2");
|
||||
|
||||
assertEquals("Test String 1", record1.get(0, String.class));
|
||||
assertEquals(Integer.valueOf(10), record1.get(1, Integer.class));
|
||||
|
||||
assertEquals("Test String 2", record2.get(0, String.class));
|
||||
assertEquals(Integer.valueOf(20), record2.get(1, Integer.class));
|
||||
}
|
||||
|
||||
@DisabledOnOs(WINDOWS)
|
||||
@ParameterizedTest
|
||||
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
|
||||
|
Loading…
x
Reference in New Issue
Block a user