NIFI-11538: This closes #7239. Fix primitive type conversion for PutIceberg

This commit is contained in:
Matthew Burgess 2023-05-11 12:10:47 -04:00 committed by Joe Witt
parent 8b795a72f1
commit 3ebe8f2983
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
4 changed files with 183 additions and 29 deletions

View File

@ -19,15 +19,16 @@ package org.apache.nifi.processors.iceberg.converter;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.Validate;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.LocalTime;
@ -47,45 +48,91 @@ import static org.apache.nifi.processors.iceberg.converter.RecordFieldGetter.cre
*/
public class GenericDataConverters {
static class SameTypeConverter extends DataConverter<Object, Object> {
static class PrimitiveTypeConverter extends DataConverter<Object, Object> {
final Type.PrimitiveType targetType;
final DataType sourceType;
public PrimitiveTypeConverter(final Type.PrimitiveType type, final DataType dataType) {
targetType = type;
sourceType = dataType;
}
@Override
public Object convert(Object data) {
return data;
switch (targetType.typeId()) {
case BOOLEAN:
return DataTypeUtils.toBoolean(data, null);
case INTEGER:
return DataTypeUtils.toInteger(data, null);
case LONG:
return DataTypeUtils.toLong(data, null);
case FLOAT:
return DataTypeUtils.toFloat(data, null);
case DOUBLE:
return DataTypeUtils.toDouble(data, null);
case DATE:
return DataTypeUtils.toLocalDate(data, () -> DataTypeUtils.getDateTimeFormatter(sourceType.getFormat(), ZoneId.systemDefault()), null);
case UUID:
return DataTypeUtils.toUUID(data);
case STRING:
default:
return DataTypeUtils.toString(data, () -> null);
}
}
}
static class TimeConverter extends DataConverter<Time, LocalTime> {
static class TimeConverter extends DataConverter<Object, LocalTime> {
private final String timeFormat;
public TimeConverter(final String format) {
this.timeFormat = format;
}
@Override
public LocalTime convert(Time data) {
return data.toLocalTime();
public LocalTime convert(Object data) {
return DataTypeUtils.toTime(data, () -> DataTypeUtils.getDateFormat(timeFormat), null).toLocalTime();
}
}
static class TimestampConverter extends DataConverter<Timestamp, LocalDateTime> {
static class TimestampConverter extends DataConverter<Object, LocalDateTime> {
private final DataType dataType;
public TimestampConverter(final DataType dataType) {
this.dataType = dataType;
}
@Override
public LocalDateTime convert(Timestamp data) {
return data.toLocalDateTime();
public LocalDateTime convert(Object data) {
final Timestamp convertedTimestamp = DataTypeUtils.toTimestamp(data, () -> DataTypeUtils.getDateFormat(dataType.getFormat()), null);
return convertedTimestamp.toLocalDateTime();
}
}
static class TimestampWithTimezoneConverter extends DataConverter<Timestamp, OffsetDateTime> {
static class TimestampWithTimezoneConverter extends DataConverter<Object, OffsetDateTime> {
private final DataType dataType;
public TimestampWithTimezoneConverter(final DataType dataType) {
this.dataType = dataType;
}
@Override
public OffsetDateTime convert(Timestamp data) {
return OffsetDateTime.ofInstant(data.toInstant(), ZoneId.of("UTC"));
public OffsetDateTime convert(Object data) {
final Timestamp convertedTimestamp = DataTypeUtils.toTimestamp(data, () -> DataTypeUtils.getDateFormat(dataType.getFormat()), null);
return OffsetDateTime.ofInstant(convertedTimestamp.toInstant(), ZoneId.of("UTC"));
}
}
static class UUIDtoByteArrayConverter extends DataConverter<UUID, byte[]> {
static class UUIDtoByteArrayConverter extends DataConverter<Object, byte[]> {
@Override
public byte[] convert(UUID data) {
public byte[] convert(Object data) {
final UUID uuid = DataTypeUtils.toUUID(data);
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
byteBuffer.putLong(data.getMostSignificantBits());
byteBuffer.putLong(data.getLeastSignificantBits());
byteBuffer.putLong(uuid.getMostSignificantBits());
byteBuffer.putLong(uuid.getLeastSignificantBits());
return byteBuffer.array();
}
}
@ -113,7 +160,7 @@ public class GenericDataConverters {
}
}
static class BigDecimalConverter extends DataConverter<BigDecimal, BigDecimal> {
static class BigDecimalConverter extends DataConverter<Object, BigDecimal> {
private final int precision;
private final int scale;
@ -123,10 +170,15 @@ public class GenericDataConverters {
}
@Override
public BigDecimal convert(BigDecimal data) {
Validate.isTrue(data.scale() == scale, "Cannot write value as decimal(%s,%s), wrong scale: %s", precision, scale, data);
Validate.isTrue(data.precision() <= precision, "Cannot write value as decimal(%s,%s), invalid precision: %s", precision, scale, data);
return data;
public BigDecimal convert(Object data) {
if (data instanceof BigDecimal) {
BigDecimal bigDecimal = (BigDecimal) data;
Validate.isTrue(bigDecimal.scale() == scale, "Cannot write value as decimal(%s,%s), wrong scale %s for value: %s", precision, scale, bigDecimal.scale(), data);
Validate.isTrue(bigDecimal.precision() <= precision, "Cannot write value as decimal(%s,%s), invalid precision %s for value: %s",
precision, scale, bigDecimal.precision(), data);
return bigDecimal;
}
return DataTypeUtils.toBigDecimal(data, null);
}
}

View File

@ -46,7 +46,6 @@ import java.util.stream.Collectors;
public class IcebergRecordConverter {
private final DataConverter<Record, GenericRecord> converter;
public GenericRecord convert(Record record) {
return converter.convert(record);
}
@ -85,21 +84,21 @@ public class IcebergRecordConverter {
case DOUBLE:
case DATE:
case STRING:
return new GenericDataConverters.SameTypeConverter();
return new GenericDataConverters.PrimitiveTypeConverter(type, dataType);
case TIME:
return new GenericDataConverters.TimeConverter();
return new GenericDataConverters.TimeConverter(dataType.getFormat());
case TIMESTAMP:
final Types.TimestampType timestampType = (Types.TimestampType) type;
if (timestampType.shouldAdjustToUTC()) {
return new GenericDataConverters.TimestampWithTimezoneConverter();
return new GenericDataConverters.TimestampWithTimezoneConverter(dataType);
}
return new GenericDataConverters.TimestampConverter();
return new GenericDataConverters.TimestampConverter(dataType);
case UUID:
final UUIDDataType uuidType = (UUIDDataType) dataType;
if (uuidType.getFileFormat() == FileFormat.PARQUET) {
return new GenericDataConverters.UUIDtoByteArrayConverter();
}
return new GenericDataConverters.SameTypeConverter();
return new GenericDataConverters.PrimitiveTypeConverter(type, dataType);
case FIXED:
final Types.FixedType fixedType = (Types.FixedType) type;
return new GenericDataConverters.FixedConverter(fixedType.length());
@ -167,7 +166,9 @@ public class IcebergRecordConverter {
return new RecordTypeWithFieldNameMapper(new Schema(schema.findField(fieldId).type().asStructType().fields()), (RecordDataType) field.getDataType());
}
if (field.getDataType().getFieldType().equals(RecordFieldType.UUID)) {
// If the source field or target field is of type UUID, create a UUIDDataType from it
if (field.getDataType().getFieldType().equals(RecordFieldType.UUID)
|| schema.findField(fieldId).type().typeId() == Type.TypeID.UUID) {
return new UUIDDataType(field.getDataType(), fileFormat);
}

View File

@ -67,6 +67,7 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
@ -143,6 +144,23 @@ public class TestIcebergRecordConverter {
Types.NestedField.optional(14, "choice", Types.IntegerType.get())
);
private static final Schema COMPATIBLE_PRIMITIVES_SCHEMA = new Schema(
Types.NestedField.optional(0, "string", Types.StringType.get()),
Types.NestedField.optional(1, "integer", Types.IntegerType.get()),
Types.NestedField.optional(2, "float", Types.FloatType.get()),
Types.NestedField.optional(3, "long", Types.LongType.get()),
Types.NestedField.optional(4, "double", Types.DoubleType.get()),
Types.NestedField.optional(5, "decimal", Types.DecimalType.of(10, 2)),
Types.NestedField.optional(7, "fixed", Types.FixedType.ofLength(5)),
Types.NestedField.optional(8, "binary", Types.BinaryType.get()),
Types.NestedField.optional(9, "date", Types.DateType.get()),
Types.NestedField.optional(10, "time", Types.TimeType.get()),
Types.NestedField.optional(11, "timestamp", Types.TimestampType.withZone()),
Types.NestedField.optional(12, "timestampTz", Types.TimestampType.withoutZone()),
Types.NestedField.optional(13, "uuid", Types.UUIDType.get()),
Types.NestedField.optional(14, "choice", Types.IntegerType.get())
);
private static final Schema CASE_INSENSITIVE_SCHEMA = new Schema(
Types.NestedField.optional(0, "FIELD1", Types.StringType.get()),
Types.NestedField.optional(1, "Field2", Types.StringType.get()),
@ -221,6 +239,26 @@ public class TestIcebergRecordConverter {
return new SimpleRecordSchema(fields);
}
private static RecordSchema getPrimitivesAsCompatiblesSchema() {
List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("string", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("integer", RecordFieldType.SHORT.getDataType()));
fields.add(new RecordField("float", RecordFieldType.DOUBLE.getDataType()));
fields.add(new RecordField("long", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("double", RecordFieldType.FLOAT.getDataType()));
fields.add(new RecordField("decimal", RecordFieldType.DOUBLE.getDataType()));
fields.add(new RecordField("fixed", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
fields.add(new RecordField("binary", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
fields.add(new RecordField("date", RecordFieldType.STRING.getDataType("yyyy-MM-dd")));
fields.add(new RecordField("time", RecordFieldType.STRING.getDataType("hh:mm:ss.SSS")));
fields.add(new RecordField("timestamp", RecordFieldType.STRING.getDataType("yyyy-MM-dd hh:mm:ss.SSSZ")));
fields.add(new RecordField("timestampTz", RecordFieldType.STRING.getDataType("yyyy-MM-dd hh:mm:ss.SSSZ")));
fields.add(new RecordField("uuid", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("choice", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.STRING.getDataType(), RecordFieldType.INT.getDataType())));
return new SimpleRecordSchema(fields);
}
private static RecordSchema getCaseInsensitiveSchema() {
List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("field1", RecordFieldType.STRING.getDataType()));
@ -331,6 +369,27 @@ public class TestIcebergRecordConverter {
return new MapRecord(getPrimitivesSchema(), values);
}
private static Record setupCompatiblePrimitivesTestRecord() {
Map<String, Object> values = new HashMap<>();
values.put("string", 123);
values.put("integer", 8);
values.put("float", 1.23456);
values.put("long", 42L);
values.put("double", 3.14159);
values.put("decimal", 12345678.12);
values.put("fixed", "hello".getBytes());
values.put("binary", "hello".getBytes());
values.put("date", "2017-04-04");
values.put("time", "14:20:33.000");
values.put("timestamp", "2017-04-04 14:20:33.789-0500");
values.put("timestampTz", "2017-04-04 14:20:33.789-0500");
values.put("uuid", "0000-00-00-00-000000");
values.put("choice", "10");
return new MapRecord(getPrimitivesAsCompatiblesSchema(), values);
}
private static Record setupCaseInsensitiveTestRecord() {
Map<String, Object> values = new HashMap<>();
values.put("field1", "Text1");
@ -414,6 +473,48 @@ public class TestIcebergRecordConverter {
}
}
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
public void testCompatiblePrimitives(FileFormat format) throws IOException {
RecordSchema nifiSchema = getPrimitivesAsCompatiblesSchema();
Record record = setupCompatiblePrimitivesTestRecord();
IcebergRecordConverter recordConverter = new IcebergRecordConverter(COMPATIBLE_PRIMITIVES_SCHEMA, nifiSchema, format);
GenericRecord genericRecord = recordConverter.convert(record);
writeTo(format, COMPATIBLE_PRIMITIVES_SCHEMA, genericRecord, tempFile);
List<GenericRecord> results = readFrom(format, COMPATIBLE_PRIMITIVES_SCHEMA, tempFile.toInputFile());
assertEquals(results.size(), 1);
GenericRecord resultRecord = results.get(0);
LocalDateTime localDateTime = LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000);
OffsetDateTime offsetDateTime = OffsetDateTime.of(localDateTime, ZoneOffset.ofHours(-5));
LocalDateTime expectedLocalDateTimestamp = offsetDateTime.atZoneSameInstant(ZoneId.systemDefault()).toLocalDateTime();
assertEquals("123", resultRecord.get(0, String.class));
assertEquals(Integer.valueOf(8), resultRecord.get(1, Integer.class));
assertEquals(Float.valueOf(1.23456F), resultRecord.get(2, Float.class));
assertEquals(Long.valueOf(42L), resultRecord.get(3, Long.class));
assertEquals(Double.valueOf(3.141590118408203), resultRecord.get(4, Double.class));
assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5, BigDecimal.class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(6, byte[].class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(7, ByteBuffer.class).array());
assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(8, LocalDate.class));
assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(9, LocalTime.class));
assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), resultRecord.get(10, OffsetDateTime.class));
assertEquals(expectedLocalDateTimestamp, resultRecord.get(11, LocalDateTime.class));
assertEquals(Integer.valueOf(10), resultRecord.get(13, Integer.class));
if (format.equals(PARQUET)) {
assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, resultRecord.get(12, byte[].class));
} else {
assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(12, UUID.class));
}
}
@DisabledOnOs(WINDOWS)
@ParameterizedTest
@EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})

View File

@ -19,7 +19,7 @@
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": ["int", "null"]},
{"name": "id", "type": ["long", "null"]},
{"name": "name", "type": ["string", "null"]},
{"name": "department", "type": ["string", "null"]}
]