From 4b24bcd2f823abe1534ab3392245c9c60a1293ee Mon Sep 17 00:00:00 2001 From: Mark Bathori Date: Fri, 21 Oct 2022 12:58:05 +0200 Subject: [PATCH] NIFI-10677: Add Choice data type handling to Iceberg record converter Signed-off-by: Pierre Villard This closes #6563. --- .../iceberg/converter/ArrayElementGetter.java | 17 ++++- .../iceberg/converter/RecordFieldGetter.java | 18 ++++- .../iceberg/TestIcebergRecordConverter.java | 67 +++++++++++++++++-- 3 files changed, 94 insertions(+), 8 deletions(-) diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java index 9a98404b26..a344575973 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/ArrayElementGetter.java @@ -19,7 +19,9 @@ package org.apache.nifi.processors.iceberg.converter; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.ChoiceDataType; import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; import javax.annotation.Nullable; import java.io.Serializable; @@ -92,8 +94,21 @@ public class ArrayElementGetter { case RECORD: elementGetter = (array, pos) -> DataTypeUtils.toRecord(array[pos], ARRAY_FIELD_NAME); break; + case CHOICE: + elementGetter = (array, pos) -> { + final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; + final DataType chosenDataType = DataTypeUtils.chooseDataType(array[pos], choiceDataType); + if (chosenDataType == null) { + throw new IllegalTypeConversionException(String.format( + "Cannot convert value [%s] of type %s for array element to any of the following available Sub-Types for a Choice: %s", + array[pos], array[pos].getClass(), choiceDataType.getPossibleSubTypes())); + } + + return DataTypeUtils.convertType(array[pos], chosenDataType, ARRAY_FIELD_NAME); + }; + break; default: - throw new IllegalArgumentException(); + throw new IllegalArgumentException("Unsupported field type: " + dataType.getFieldType()); } return (array, pos) -> { diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java index ca50c49f89..24a21d6ef7 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/RecordFieldGetter.java @@ -20,8 +20,10 @@ package org.apache.nifi.processors.iceberg.converter; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.ChoiceDataType; import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; import javax.annotation.Nullable; import java.io.Serializable; @@ -94,8 +96,22 @@ public class RecordFieldGetter { case RECORD: fieldGetter = record -> record.getAsRecord(fieldName, ((RecordDataType) dataType).getChildSchema()); break; + case CHOICE: + fieldGetter = record -> { + final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; + final Object value = record.getValue(fieldName); + final DataType chosenDataType = DataTypeUtils.chooseDataType(value, choiceDataType); + if (chosenDataType == null) { + throw new IllegalTypeConversionException(String.format( + "Cannot convert value [%s] of type %s for field %s to any of the following available Sub-Types for a Choice: %s", + value, value.getClass(), fieldName, choiceDataType.getPossibleSubTypes())); + } + + return DataTypeUtils.convertType(record.getValue(fieldName), chosenDataType, fieldName); + }; + break; default: - throw new IllegalArgumentException(); + throw new IllegalArgumentException("Unsupported field type: " + dataType.getFieldType()); } if (!isNullable) { diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java index 1308316e16..759b0bc576 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java @@ -38,8 +38,11 @@ import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.apache.nifi.processors.iceberg.converter.ArrayElementGetter; import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter; +import org.apache.nifi.processors.iceberg.converter.RecordFieldGetter; import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; @@ -131,7 +134,8 @@ public class TestIcebergRecordConverter { Types.NestedField.optional(10, "time", Types.TimeType.get()), Types.NestedField.optional(11, "timestamp", Types.TimestampType.withZone()), Types.NestedField.optional(12, "timestampTz", Types.TimestampType.withoutZone()), - Types.NestedField.optional(13, "uuid", Types.UUIDType.get()) + Types.NestedField.optional(13, "uuid", Types.UUIDType.get()), + Types.NestedField.optional(14, "choice", Types.IntegerType.get()) ); private static RecordSchema getStructSchema() { @@ -188,6 +192,16 @@ public class TestIcebergRecordConverter { fields.add(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType())); fields.add(new RecordField("timestampTz", RecordFieldType.TIMESTAMP.getDataType())); fields.add(new RecordField("uuid", RecordFieldType.UUID.getDataType())); + fields.add(new RecordField("choice", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType()))); + + return new SimpleRecordSchema(fields); + } + + private static RecordSchema getChoiceSchema() { + List fields = new ArrayList<>(); + fields.add(new RecordField("string", RecordFieldType.INT.getDataType())); + fields.add(new RecordField("integer", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("float", RecordFieldType.LONG.getDataType())); return new SimpleRecordSchema(fields); } @@ -233,7 +247,7 @@ public class TestIcebergRecordConverter { return new MapRecord(getMapSchema(), values); } - private static Record setupPrimitivesTestRecord(RecordSchema schema) { + private static Record setupPrimitivesTestRecord() { LocalDate localDate = LocalDate.of(2017, 4, 4); LocalTime localTime = LocalTime.of(14, 20, 33); LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000); @@ -254,14 +268,24 @@ public class TestIcebergRecordConverter { values.put("timestamp", Timestamp.from(offsetDateTime.toInstant())); values.put("timestampTz", Timestamp.valueOf(localDateTime)); values.put("uuid", UUID.fromString("0000-00-00-00-000000")); + values.put("choice", "10"); - return new MapRecord(schema, values); + return new MapRecord(getPrimitivesSchema(), values); + } + + private static Record setupChoiceTestRecord() { + Map values = new HashMap<>(); + values.put("choice1", "20"); + values.put("choice2", "30a"); + values.put("choice3", String.valueOf(Long.MAX_VALUE)); + + return new MapRecord(getChoiceSchema(), values); } @Test public void testPrimitivesAvro() throws IOException { RecordSchema nifiSchema = getPrimitivesSchema(); - Record record = setupPrimitivesTestRecord(nifiSchema); + Record record = setupPrimitivesTestRecord(); IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.AVRO); GenericRecord genericRecord = recordConverter.convert(record); @@ -290,13 +314,14 @@ public class TestIcebergRecordConverter { Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC)); Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000)); Assertions.assertEquals(resultRecord.get(13, UUID.class), UUID.fromString("0000-00-00-00-000000")); + Assertions.assertEquals(resultRecord.get(14, Integer.class), new Integer(10)); } @DisabledOnOs(WINDOWS) @Test public void testPrimitivesOrc() throws IOException { RecordSchema nifiSchema = getPrimitivesSchema(); - Record record = setupPrimitivesTestRecord(nifiSchema); + Record record = setupPrimitivesTestRecord(); IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.ORC); GenericRecord genericRecord = recordConverter.convert(record); @@ -325,12 +350,13 @@ public class TestIcebergRecordConverter { Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC)); Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000)); Assertions.assertEquals(resultRecord.get(13, UUID.class), UUID.fromString("0000-00-00-00-000000")); + Assertions.assertEquals(resultRecord.get(14, Integer.class), new Integer(10)); } @Test public void testPrimitivesParquet() throws IOException { RecordSchema nifiSchema = getPrimitivesSchema(); - Record record = setupPrimitivesTestRecord(nifiSchema); + Record record = setupPrimitivesTestRecord(); IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.PARQUET); GenericRecord genericRecord = recordConverter.convert(record); @@ -359,6 +385,7 @@ public class TestIcebergRecordConverter { Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC)); Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000)); Assertions.assertArrayEquals(resultRecord.get(13, byte[].class), new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}); + Assertions.assertEquals(resultRecord.get(14, Integer.class), new Integer(10)); } @Test @@ -647,6 +674,34 @@ public class TestIcebergRecordConverter { assertTrue(e.getMessage().contains("java.util.ArrayList cannot be cast")); } + @Test + public void testChoiceDataTypeInRecord() { + Record record = setupChoiceTestRecord(); + DataType dataType = RecordFieldType.CHOICE.getChoiceDataType( + RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType(), RecordFieldType.LONG.getDataType()); + + RecordFieldGetter.FieldGetter fieldGetter1 = RecordFieldGetter.createFieldGetter(dataType, "choice1", true); + RecordFieldGetter.FieldGetter fieldGetter2 = RecordFieldGetter.createFieldGetter(dataType, "choice2", true); + RecordFieldGetter.FieldGetter fieldGetter3 = RecordFieldGetter.createFieldGetter(dataType, "choice3", true); + + Assertions.assertInstanceOf(Integer.class, fieldGetter1.getFieldOrNull(record)); + Assertions.assertInstanceOf(String.class, fieldGetter2.getFieldOrNull(record)); + Assertions.assertInstanceOf(Long.class, fieldGetter3.getFieldOrNull(record)); + } + + @Test + public void testChoiceDataTypeInArray() { + DataType dataType = RecordFieldType.CHOICE.getChoiceDataType( + RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType(), RecordFieldType.LONG.getDataType()); + ArrayElementGetter.ElementGetter elementGetter = ArrayElementGetter.createElementGetter(dataType); + + String[] testArray = {"20", "30a", String.valueOf(Long.MAX_VALUE)}; + + Assertions.assertInstanceOf(Integer.class, elementGetter.getElementOrNull(testArray, 0)); + Assertions.assertInstanceOf(String.class, elementGetter.getElementOrNull(testArray, 1)); + Assertions.assertInstanceOf(Long.class, elementGetter.getElementOrNull(testArray, 2)); + } + public void writeToAvro(Schema schema, GenericRecord record, OutputFile outputFile) throws IOException { try (FileAppender appender = Avro.write(outputFile) .schema(schema)