NIFI-12847: Add Enum data type handling to Iceberg record converter

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8453.
This commit is contained in:
Mark Bathori 2024-02-27 13:47:19 +01:00 committed by Pierre Villard
parent f119c49c4d
commit c29a744644
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
6 changed files with 36 additions and 11 deletions

View File

@ -1103,7 +1103,7 @@ public class DataTypeUtils {
return enumType.getEnums() != null && enumType.getEnums().contains(value);
}
private static Object toEnum(Object value, EnumDataType dataType, String fieldName) {
public static Object toEnum(Object value, EnumDataType dataType, String fieldName) {
if(dataType.getEnums() != null && dataType.getEnums().contains(value)) {
return value.toString();
}

View File

@ -22,6 +22,7 @@ import org.apache.nifi.serialization.record.field.FieldConverter;
import org.apache.nifi.serialization.record.field.StandardFieldConverterRegistry;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.EnumDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
@ -96,6 +97,9 @@ public class ArrayElementGetter {
return converter.convertField(element, Optional.ofNullable(dataType.getFormat()), ARRAY_FIELD_NAME);
};
break;
case ENUM:
elementGetter = element -> DataTypeUtils.toEnum(element, (EnumDataType) dataType, ARRAY_FIELD_NAME);
break;
case UUID:
elementGetter = DataTypeUtils::toUUID;
break;

View File

@ -23,6 +23,7 @@ import org.apache.nifi.serialization.record.field.FieldConverter;
import org.apache.nifi.serialization.record.field.StandardFieldConverterRegistry;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.EnumDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
@ -101,6 +102,9 @@ public class RecordFieldGetter {
case UUID:
fieldGetter = record -> DataTypeUtils.toUUID(record.getValue(fieldName));
break;
case ENUM:
fieldGetter = record -> DataTypeUtils.toEnum(record.getValue(fieldName), (EnumDataType) dataType, fieldName);
break;
case ARRAY:
fieldGetter = record -> DataTypeUtils.toArray(record.getValue(fieldName), fieldName, ((ArrayDataType) dataType).getElementType());
break;

View File

@ -166,7 +166,8 @@ public class TestIcebergRecordConverter {
Types.NestedField.optional(11, "timestamp", Types.TimestampType.withZone()),
Types.NestedField.optional(12, "timestampTz", Types.TimestampType.withoutZone()),
Types.NestedField.optional(13, "uuid", Types.UUIDType.get()),
Types.NestedField.optional(14, "choice", Types.IntegerType.get())
Types.NestedField.optional(14, "choice", Types.IntegerType.get()),
Types.NestedField.optional(15, "enum", Types.StringType.get())
);
private static final Schema PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS = new Schema(
@ -294,6 +295,7 @@ public class TestIcebergRecordConverter {
fields.add(new RecordField("timestampTz", RecordFieldType.TIMESTAMP.getDataType()));
fields.add(new RecordField("uuid", RecordFieldType.UUID.getDataType()));
fields.add(new RecordField("choice", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType())));
fields.add(new RecordField("enum", RecordFieldType.ENUM.getEnumDataType(Arrays.asList("red", "blue", "yellow"))));
return new SimpleRecordSchema(fields);
}
@ -469,6 +471,7 @@ public class TestIcebergRecordConverter {
values.put("timestampTz", Timestamp.valueOf(LOCAL_DATE_TIME));
values.put("uuid", UUID.fromString("0000-00-00-00-000000"));
values.put("choice", "10");
values.put("enum", "blue");
return new MapRecord(getPrimitivesSchema(), values);
}
@ -590,6 +593,7 @@ public class TestIcebergRecordConverter {
assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), resultRecord.get(11, OffsetDateTime.class));
assertEquals(LOCAL_DATE_TIME, resultRecord.get(12, LocalDateTime.class));
assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
assertEquals("blue", resultRecord.get(15, String.class));
if (format.equals(PARQUET)) {
assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, resultRecord.get(13, byte[].class));

View File

@ -109,7 +109,7 @@ public class TestPutIcebergWithHiveCatalog {
RecordSchema recordSchema = AvroTypeUtil.createSchema(inputSchema);
for (RecordField recordField : recordSchema.getFields()) {
readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable());
readerFactory.addSchemaField(recordField);
}
readerFactory.addRecord(0, "John", "Finance");

View File

@ -15,12 +15,25 @@
* limitations under the License.
*/
{
"namespace": "nifi",
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": ["long", "null"]},
{"name": "name", "type": ["string", "null"]},
{"name": "department", "type": ["string", "null"]}
]
"namespace": "nifi",
"type": "record",
"name": "User",
"fields": [
{
"name": "id",
"type": ["long", "null"]
},
{
"name": "name",
"type": ["string", "null"]
},
{
"name": "department",
"type": {
"name": "Department",
"type": "enum",
"symbols": ["Finance", "Marketing", "Sales"]
}
}
]
}