NIFI-13593 PutIceberg issue with decimal scale

This closes #9121

Signed-off-by: Mark Bathori <mbathori@apache.org>
This commit is contained in:
Krisztina Zsihovszki 2024-07-29 18:13:11 +02:00 committed by Mark Bathori
parent 7e200fa34e
commit b3c6d8d98a
No known key found for this signature in database
GPG Key ID: 32DC6BA5A13FAA04
2 changed files with 55 additions and 47 deletions

View File

@ -188,13 +188,17 @@ public class GenericDataConverters {
if (data == null) {
return null;
}
if (data instanceof BigDecimal bigDecimal) {
Validate.isTrue(bigDecimal.scale() == scale, "Cannot write value as decimal(%s,%s), wrong scale %s for value: %s", precision, scale, bigDecimal.scale(), data);
Validate.isTrue(bigDecimal.precision() <= precision, "Cannot write value as decimal(%s,%s), invalid precision %s for value: %s",
precision, scale, bigDecimal.precision(), data);
return bigDecimal;
BigDecimal bigDecimal = DataTypeUtils.toBigDecimal(data, null);
if (bigDecimal.scale() < scale) {
bigDecimal = bigDecimal.setScale(scale);
}
return DataTypeUtils.toBigDecimal(data, null);
Validate.isTrue(bigDecimal.scale() == scale, "Cannot write value as decimal(%s,%s), wrong scale %s for value: %s", precision, scale, bigDecimal.scale(), data);
Validate.isTrue(bigDecimal.precision() <= precision, "Cannot write value as decimal(%s,%s), invalid precision %s for value: %s",
precision, scale, bigDecimal.precision(), data);
return bigDecimal;
}
}

View File

@ -156,16 +156,17 @@ public class TestIcebergRecordConverter {
Types.NestedField.optional(3, "long", Types.LongType.get()),
Types.NestedField.optional(4, "double", Types.DoubleType.get()),
Types.NestedField.optional(5, "decimal", Types.DecimalType.of(10, 2)),
Types.NestedField.optional(6, "boolean", Types.BooleanType.get()),
Types.NestedField.optional(7, "fixed", Types.FixedType.ofLength(5)),
Types.NestedField.optional(8, "binary", Types.BinaryType.get()),
Types.NestedField.optional(9, "date", Types.DateType.get()),
Types.NestedField.optional(10, "time", Types.TimeType.get()),
Types.NestedField.optional(11, "timestamp", Types.TimestampType.withZone()),
Types.NestedField.optional(12, "timestampTz", Types.TimestampType.withoutZone()),
Types.NestedField.optional(13, "uuid", Types.UUIDType.get()),
Types.NestedField.optional(14, "choice", Types.IntegerType.get()),
Types.NestedField.optional(15, "enum", Types.StringType.get())
Types.NestedField.optional(6, "decimalLowerScore", Types.DecimalType.of(10, 2)),
Types.NestedField.optional(7, "boolean", Types.BooleanType.get()),
Types.NestedField.optional(8, "fixed", Types.FixedType.ofLength(5)),
Types.NestedField.optional(9, "binary", Types.BinaryType.get()),
Types.NestedField.optional(10, "date", Types.DateType.get()),
Types.NestedField.optional(11, "time", Types.TimeType.get()),
Types.NestedField.optional(12, "timestamp", Types.TimestampType.withZone()),
Types.NestedField.optional(13, "timestampTz", Types.TimestampType.withoutZone()),
Types.NestedField.optional(14, "uuid", Types.UUIDType.get()),
Types.NestedField.optional(15, "choice", Types.IntegerType.get()),
Types.NestedField.optional(16, "enum", Types.StringType.get())
);
private static final Schema PRIMITIVES_SCHEMA_WITH_REQUIRED_FIELDS = new Schema(
@ -284,6 +285,7 @@ public class TestIcebergRecordConverter {
fields.add(new RecordField("long", RecordFieldType.LONG.getDataType()));
fields.add(new RecordField("double", RecordFieldType.DOUBLE.getDataType()));
fields.add(new RecordField("decimal", RecordFieldType.DECIMAL.getDecimalDataType(10, 2)));
fields.add(new RecordField("decimalLowerScore", RecordFieldType.DECIMAL.getDecimalDataType(10, 2)));
fields.add(new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()));
fields.add(new RecordField("fixed", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
fields.add(new RecordField("binary", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())));
@ -460,6 +462,7 @@ public class TestIcebergRecordConverter {
values.put("long", 42L);
values.put("double", 3.14159D);
values.put("decimal", new BigDecimal("12345678.12"));
values.put("decimalLowerScore", 12345678.1);
values.put("boolean", true);
values.put("fixed", "hello".getBytes());
values.put("binary", "hello".getBytes());
@ -583,20 +586,21 @@ public class TestIcebergRecordConverter {
assertEquals(Long.valueOf(42L), resultRecord.get(3, Long.class));
assertEquals(Double.valueOf(3.14159D), resultRecord.get(4, Double.class));
assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5, BigDecimal.class));
assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(7, byte[].class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(8, ByteBuffer.class).array());
assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9, LocalDate.class));
assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10, LocalTime.class));
assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), resultRecord.get(11, OffsetDateTime.class));
assertEquals(LOCAL_DATE_TIME, resultRecord.get(12, LocalDateTime.class));
assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
assertEquals("blue", resultRecord.get(15, String.class));
assertEquals(new BigDecimal("12345678.10"), resultRecord.get(6, BigDecimal.class));
assertEquals(Boolean.TRUE, resultRecord.get(7, Boolean.class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(8, byte[].class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(9, ByteBuffer.class).array());
assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(10, LocalDate.class));
assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(11, LocalTime.class));
assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), resultRecord.get(12, OffsetDateTime.class));
assertEquals(LOCAL_DATE_TIME, resultRecord.get(13, LocalDateTime.class));
assertEquals(Integer.valueOf(10), resultRecord.get(15, Integer.class));
assertEquals("blue", resultRecord.get(16, String.class));
if (format.equals(PARQUET)) {
assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, resultRecord.get(13, byte[].class));
assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, resultRecord.get(14, byte[].class));
} else {
assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(13, UUID.class));
assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(14, UUID.class));
}
}
@ -626,14 +630,14 @@ public class TestIcebergRecordConverter {
assertNull(resultRecord.get(3, Long.class));
assertEquals(Double.valueOf(3.14159D), resultRecord.get(4, Double.class));
assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5, BigDecimal.class));
assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(7, byte[].class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(8, ByteBuffer.class).array());
assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9, LocalDate.class));
assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10, LocalTime.class));
assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), resultRecord.get(11, OffsetDateTime.class));
assertEquals(LOCAL_DATE_TIME, resultRecord.get(12, LocalDateTime.class));
assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
assertEquals(Boolean.TRUE, resultRecord.get(7, Boolean.class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(8, byte[].class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(9, ByteBuffer.class).array());
assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(10, LocalDate.class));
assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(11, LocalTime.class));
assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), resultRecord.get(12, OffsetDateTime.class));
assertEquals(LOCAL_DATE_TIME, resultRecord.get(13, LocalDateTime.class));
assertEquals(Integer.valueOf(10), resultRecord.get(15, Integer.class));
if (format.equals(FileFormat.PARQUET)) {
// Parquet uses a conversion to the byte values of numeric characters such as "0" -> byte value 0
@ -641,9 +645,9 @@ public class TestIcebergRecordConverter {
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
byteBuffer.putLong(uuid.getMostSignificantBits());
byteBuffer.putLong(uuid.getLeastSignificantBits());
assertArrayEquals(byteBuffer.array(), resultRecord.get(13, byte[].class));
assertArrayEquals(byteBuffer.array(), resultRecord.get(14, byte[].class));
} else {
assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(13, UUID.class));
assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(14, UUID.class));
}
// Test null values
@ -711,14 +715,14 @@ public class TestIcebergRecordConverter {
assertNull(resultRecord.get(3, Long.class));
assertEquals(Double.valueOf(3.14159D), resultRecord.get(4, Double.class));
assertEquals(new BigDecimal("12345678.12"), resultRecord.get(5, BigDecimal.class));
assertEquals(Boolean.TRUE, resultRecord.get(6, Boolean.class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(7, byte[].class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(8, ByteBuffer.class).array());
assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(9, LocalDate.class));
assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(10, LocalTime.class));
assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), resultRecord.get(11, OffsetDateTime.class));
assertEquals(LOCAL_DATE_TIME, resultRecord.get(12, LocalDateTime.class));
assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
assertEquals(Boolean.TRUE, resultRecord.get(7, Boolean.class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(8, byte[].class));
assertArrayEquals(new byte[]{104, 101, 108, 108, 111}, resultRecord.get(9, ByteBuffer.class).array());
assertEquals(LocalDate.of(2017, 4, 4), resultRecord.get(10, LocalDate.class));
assertEquals(LocalTime.of(14, 20, 33), resultRecord.get(11, LocalTime.class));
assertEquals(offsetDateTime.withOffsetSameInstant(ZoneOffset.UTC), resultRecord.get(12, OffsetDateTime.class));
assertEquals(LOCAL_DATE_TIME, resultRecord.get(13, LocalDateTime.class));
assertEquals(Integer.valueOf(10), resultRecord.get(15, Integer.class));
if (format.equals(FileFormat.PARQUET)) {
// Parquet uses a conversion to the byte values of numeric characters such as "0" -> byte value 0
@ -726,9 +730,9 @@ public class TestIcebergRecordConverter {
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
byteBuffer.putLong(uuid.getMostSignificantBits());
byteBuffer.putLong(uuid.getLeastSignificantBits());
assertArrayEquals(byteBuffer.array(), resultRecord.get(13, byte[].class));
assertArrayEquals(byteBuffer.array(), resultRecord.get(14, byte[].class));
} else {
assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(13, UUID.class));
assertEquals(UUID.fromString("0000-00-00-00-000000"), resultRecord.get(14, UUID.class));
}
}