NIFI-10677: Add Choice data type handling to Iceberg record converter

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #6563.
This commit is contained in:
Mark Bathori 2022-10-21 12:58:05 +02:00 committed by Pierre Villard
parent bf24d575b3
commit 4b24bcd2f8
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
3 changed files with 94 additions and 8 deletions

View File

@ -19,7 +19,9 @@ package org.apache.nifi.processors.iceberg.converter;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import javax.annotation.Nullable;
import java.io.Serializable;
@ -92,8 +94,21 @@ public class ArrayElementGetter {
case RECORD:
elementGetter = (array, pos) -> DataTypeUtils.toRecord(array[pos], ARRAY_FIELD_NAME);
break;
case CHOICE:
elementGetter = (array, pos) -> {
final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
final DataType chosenDataType = DataTypeUtils.chooseDataType(array[pos], choiceDataType);
if (chosenDataType == null) {
throw new IllegalTypeConversionException(String.format(
"Cannot convert value [%s] of type %s for array element to any of the following available Sub-Types for a Choice: %s",
array[pos], array[pos].getClass(), choiceDataType.getPossibleSubTypes()));
}
return DataTypeUtils.convertType(array[pos], chosenDataType, ARRAY_FIELD_NAME);
};
break;
default:
throw new IllegalArgumentException();
throw new IllegalArgumentException("Unsupported field type: " + dataType.getFieldType());
}
return (array, pos) -> {

View File

@ -20,8 +20,10 @@ package org.apache.nifi.processors.iceberg.converter;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import javax.annotation.Nullable;
import java.io.Serializable;
@ -94,8 +96,22 @@ public class RecordFieldGetter {
case RECORD:
fieldGetter = record -> record.getAsRecord(fieldName, ((RecordDataType) dataType).getChildSchema());
break;
case CHOICE:
fieldGetter = record -> {
final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
final Object value = record.getValue(fieldName);
final DataType chosenDataType = DataTypeUtils.chooseDataType(value, choiceDataType);
if (chosenDataType == null) {
throw new IllegalTypeConversionException(String.format(
"Cannot convert value [%s] of type %s for field %s to any of the following available Sub-Types for a Choice: %s",
value, value.getClass(), fieldName, choiceDataType.getPossibleSubTypes()));
}
return DataTypeUtils.convertType(record.getValue(fieldName), chosenDataType, fieldName);
};
break;
default:
throw new IllegalArgumentException();
throw new IllegalArgumentException("Unsupported field type: " + dataType.getFieldType());
}
if (!isNullable) {

View File

@ -38,8 +38,11 @@ import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.nifi.processors.iceberg.converter.ArrayElementGetter;
import org.apache.nifi.processors.iceberg.converter.IcebergRecordConverter;
import org.apache.nifi.processors.iceberg.converter.RecordFieldGetter;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
@ -131,7 +134,8 @@ public class TestIcebergRecordConverter {
Types.NestedField.optional(10, "time", Types.TimeType.get()),
Types.NestedField.optional(11, "timestamp", Types.TimestampType.withZone()),
Types.NestedField.optional(12, "timestampTz", Types.TimestampType.withoutZone()),
Types.NestedField.optional(13, "uuid", Types.UUIDType.get())
Types.NestedField.optional(13, "uuid", Types.UUIDType.get()),
Types.NestedField.optional(14, "choice", Types.IntegerType.get())
);
private static RecordSchema getStructSchema() {
@ -188,6 +192,16 @@ public class TestIcebergRecordConverter {
fields.add(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
fields.add(new RecordField("timestampTz", RecordFieldType.TIMESTAMP.getDataType()));
fields.add(new RecordField("uuid", RecordFieldType.UUID.getDataType()));
fields.add(new RecordField("choice", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType())));
return new SimpleRecordSchema(fields);
}
private static RecordSchema getChoiceSchema() {
List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("string", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("integer", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("float", RecordFieldType.LONG.getDataType()));
return new SimpleRecordSchema(fields);
}
@ -233,7 +247,7 @@ public class TestIcebergRecordConverter {
return new MapRecord(getMapSchema(), values);
}
private static Record setupPrimitivesTestRecord(RecordSchema schema) {
private static Record setupPrimitivesTestRecord() {
LocalDate localDate = LocalDate.of(2017, 4, 4);
LocalTime localTime = LocalTime.of(14, 20, 33);
LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
@ -254,14 +268,24 @@ public class TestIcebergRecordConverter {
values.put("timestamp", Timestamp.from(offsetDateTime.toInstant()));
values.put("timestampTz", Timestamp.valueOf(localDateTime));
values.put("uuid", UUID.fromString("0000-00-00-00-000000"));
values.put("choice", "10");
return new MapRecord(schema, values);
return new MapRecord(getPrimitivesSchema(), values);
}
private static Record setupChoiceTestRecord() {
Map<String, Object> values = new HashMap<>();
values.put("choice1", "20");
values.put("choice2", "30a");
values.put("choice3", String.valueOf(Long.MAX_VALUE));
return new MapRecord(getChoiceSchema(), values);
}
@Test
public void testPrimitivesAvro() throws IOException {
RecordSchema nifiSchema = getPrimitivesSchema();
Record record = setupPrimitivesTestRecord(nifiSchema);
Record record = setupPrimitivesTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.AVRO);
GenericRecord genericRecord = recordConverter.convert(record);
@ -290,13 +314,14 @@ public class TestIcebergRecordConverter {
Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC));
Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000));
Assertions.assertEquals(resultRecord.get(13, UUID.class), UUID.fromString("0000-00-00-00-000000"));
Assertions.assertEquals(resultRecord.get(14, Integer.class), new Integer(10));
}
@DisabledOnOs(WINDOWS)
@Test
public void testPrimitivesOrc() throws IOException {
RecordSchema nifiSchema = getPrimitivesSchema();
Record record = setupPrimitivesTestRecord(nifiSchema);
Record record = setupPrimitivesTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.ORC);
GenericRecord genericRecord = recordConverter.convert(record);
@ -325,12 +350,13 @@ public class TestIcebergRecordConverter {
Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC));
Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000));
Assertions.assertEquals(resultRecord.get(13, UUID.class), UUID.fromString("0000-00-00-00-000000"));
Assertions.assertEquals(resultRecord.get(14, Integer.class), new Integer(10));
}
@Test
public void testPrimitivesParquet() throws IOException {
RecordSchema nifiSchema = getPrimitivesSchema();
Record record = setupPrimitivesTestRecord(nifiSchema);
Record record = setupPrimitivesTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(PRIMITIVES, nifiSchema, FileFormat.PARQUET);
GenericRecord genericRecord = recordConverter.convert(record);
@ -359,6 +385,7 @@ public class TestIcebergRecordConverter {
Assertions.assertEquals(resultRecord.get(11, OffsetDateTime.class), offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC));
Assertions.assertEquals(resultRecord.get(12, LocalDateTime.class), LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000));
Assertions.assertArrayEquals(resultRecord.get(13, byte[].class), new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0});
Assertions.assertEquals(resultRecord.get(14, Integer.class), new Integer(10));
}
@Test
@ -647,6 +674,34 @@ public class TestIcebergRecordConverter {
assertTrue(e.getMessage().contains("java.util.ArrayList cannot be cast"));
}
@Test
public void testChoiceDataTypeInRecord() {
Record record = setupChoiceTestRecord();
DataType dataType = RecordFieldType.CHOICE.getChoiceDataType(
RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType(), RecordFieldType.LONG.getDataType());
RecordFieldGetter.FieldGetter fieldGetter1 = RecordFieldGetter.createFieldGetter(dataType, "choice1", true);
RecordFieldGetter.FieldGetter fieldGetter2 = RecordFieldGetter.createFieldGetter(dataType, "choice2", true);
RecordFieldGetter.FieldGetter fieldGetter3 = RecordFieldGetter.createFieldGetter(dataType, "choice3", true);
Assertions.assertInstanceOf(Integer.class, fieldGetter1.getFieldOrNull(record));
Assertions.assertInstanceOf(String.class, fieldGetter2.getFieldOrNull(record));
Assertions.assertInstanceOf(Long.class, fieldGetter3.getFieldOrNull(record));
}
@Test
public void testChoiceDataTypeInArray() {
DataType dataType = RecordFieldType.CHOICE.getChoiceDataType(
RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType(), RecordFieldType.LONG.getDataType());
ArrayElementGetter.ElementGetter elementGetter = ArrayElementGetter.createElementGetter(dataType);
String[] testArray = {"20", "30a", String.valueOf(Long.MAX_VALUE)};
Assertions.assertInstanceOf(Integer.class, elementGetter.getElementOrNull(testArray, 0));
Assertions.assertInstanceOf(String.class, elementGetter.getElementOrNull(testArray, 1));
Assertions.assertInstanceOf(Long.class, elementGetter.getElementOrNull(testArray, 2));
}
public void writeToAvro(Schema schema, GenericRecord record, OutputFile outputFile) throws IOException {
try (FileAppender<GenericRecord> appender = Avro.write(outputFile)
.schema(schema)