From c29a744644134bb122dbbddc3eb3d6ba3d98508a Mon Sep 17 00:00:00 2001 From: Mark Bathori Date: Tue, 27 Feb 2024 13:47:19 +0100 Subject: [PATCH] NIFI-12847: Add Enum data type handling to Iceberg record converter Signed-off-by: Pierre Villard This closes #8453. --- .../record/util/DataTypeUtils.java | 2 +- .../iceberg/converter/ArrayElementGetter.java | 4 +++ .../iceberg/converter/RecordFieldGetter.java | 4 +++ .../iceberg/TestIcebergRecordConverter.java | 6 +++- .../TestPutIcebergWithHiveCatalog.java | 2 +- .../src/test/resources/user.avsc | 29 ++++++++++++++----- 6 files changed, 36 insertions(+), 11 deletions(-) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index 000435e410..d45b5053c3 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -1103,7 +1103,7 @@ public class DataTypeUtils { return enumType.getEnums() != null && enumType.getEnums().contains(value); } - private static Object toEnum(Object value, EnumDataType dataType, String fieldName) { + public static Object toEnum(Object value, EnumDataType dataType, String fieldName) { if(dataType.getEnums() != null && dataType.getEnums().contains(value)) { return value.toString(); } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java index f03c8f64a0..7e0be786e7 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java @@ -22,6 +22,7 @@ import org.apache.nifi.serialization.record.field.FieldConverter; import org.apache.nifi.serialization.record.field.StandardFieldConverterRegistry; import org.apache.nifi.serialization.record.type.ArrayDataType; import org.apache.nifi.serialization.record.type.ChoiceDataType; +import org.apache.nifi.serialization.record.type.EnumDataType; import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; @@ -96,6 +97,9 @@ public class ArrayElementGetter { return converter.convertField(element, Optional.ofNullable(dataType.getFormat()), ARRAY_FIELD_NAME); }; break; + case ENUM: + elementGetter = element -> DataTypeUtils.toEnum(element, (EnumDataType) dataType, ARRAY_FIELD_NAME); + break; case UUID: elementGetter = DataTypeUtils::toUUID; break; diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java index 2b7c7c09da..d0f9d55d87 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java @@ -23,6 +23,7 @@ import org.apache.nifi.serialization.record.field.FieldConverter; import org.apache.nifi.serialization.record.field.StandardFieldConverterRegistry; import org.apache.nifi.serialization.record.type.ArrayDataType; import org.apache.nifi.serialization.record.type.ChoiceDataType; +import org.apache.nifi.serialization.record.type.EnumDataType; import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; @@ -101,6 +102,9 @@ public class RecordFieldGetter { case UUID: fieldGetter = record -> DataTypeUtils.toUUID(record.getValue(fieldName)); break; + case ENUM: + fieldGetter = record -> DataTypeUtils.toEnum(record.getValue(fieldName), (EnumDataType) dataType, fieldName); + break; case ARRAY: fieldGetter = record -> DataTypeUtils.toArray(record.getValue(fieldName), fieldName, ((ArrayDataType) dataType).getElementType()); break; diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java index b067a9202d..48915ba7f2 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java @@ -166,7 +166,8 @@ public class TestIcebergRecordConverter { Types.NestedField.optional(11, "timestamp", Types.TimestampType.withZone()), Types.NestedField.optional(12, "timestampTz", Types.TimestampType.withoutZone()), Types.NestedField.optional(13, "uuid", Types.UUIDType.get()), - Types.NestedField.optional(14, "choice", Types.IntegerType.get()) + Types.NestedField.optional(14, "choice", Types.IntegerType.get()), + Types.NestedField.optional(15, "enum", Types.StringType.get()) ); private static final Schema PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS = new Schema( @@ -294,6 +295,7 @@ public class TestIcebergRecordConverter { fields.add(new RecordField("timestampTz", RecordFieldType.TIMESTAMP.getDataType())); fields.add(new RecordField("uuid", RecordFieldType.UUID.getDataType())); fields.add(new RecordField("choice", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType()))); + fields.add(new RecordField("enum", RecordFieldType.ENUM.getEnumDataType(Arrays.asList("red", "blue", "yellow")))); return new SimpleRecordSchema(fields); } @@ -469,6 +471,7 @@ public class TestIcebergRecordConverter { values.put("timestampTz", Timestamp.valueOf(LOCAL_DATE_TIME)); values.put("uuid", UUID.fromString("0000-00-00-00-000000")); values.put("choice", "10"); + values.put("enum", "blue"); return new MapRecord(getPrimitivesSchema(), values); } @@ -590,6 +593,7 @@ public class TestIcebergRecordConverter { assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), resultRecord.get(11, OffsetDateTime.class)); assertEquals(LOCAL_DATE_TIME, resultRecord.get(12, LocalDateTime.class)); assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class)); + assertEquals("blue", resultRecord.get(15, String.class)); if (format.equals(PARQUET)) { assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, resultRecord.get(13, byte[].class)); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java index c61925050c..d2b942cf58 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java @@ -109,7 +109,7 @@ public class TestPutIcebergWithHiveCatalog { RecordSchema recordSchema = AvroTypeUtil.createSchema(inputSchema); for (RecordField recordField : recordSchema.getFields()) { - readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable()); + readerFactory.addSchemaField(recordField); } readerFactory.addRecord(0, "John", "Finance"); diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc index c537a9e496..799c0023b4 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/resources/user.avsc @@ -15,12 +15,25 @@ * limitations under the License. */ { - "namespace": "nifi", - "type": "record", - "name": "User", - "fields": [ - {"name": "id", "type": ["long", "null"]}, - {"name": "name", "type": ["string", "null"]}, - {"name": "department", "type": ["string", "null"]} - ] + "namespace": "nifi", + "type": "record", + "name": "User", + "fields": [ + { + "name": "id", + "type": ["long", "null"] + }, + { + "name": "name", + "type": ["string", "null"] + }, + { + "name": "department", + "type": { + "name": "Department", + "type": "enum", + "symbols": ["Finance", "Marketing", "Sales"] + } + } + ] }