Fix decimal type support for the delta input format. (#17376)

The Delta Decimal type wasn't handled correctly in the Druid Delta connector, resulting in the error: Unsupported fieldType[Decimal(4, 2)] for fieldName[price].

There were no tests or existing tables with the Decimal type, so I've updated the existing table, complex-types-table to include this data type.

Note that the Decimal type can only be handled as a double at most in Druid. For a big decimal that cannot fit inside a double, it should be ingested as a string.
This commit is contained in:
Abhishek Radhakrishnan 2024-10-18 08:09:35 -07:00 committed by GitHub
parent 5b09329479
commit a44006c998
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 21 additions and 16 deletions

View File

@ -201,7 +201,7 @@ public class DeltaInputRow implements InputRow
}
return String.valueOf(charArray);
} else if (dataType instanceof DecimalType) {
return dataRow.getDecimal(columnOrdinal).longValue();
return dataRow.getDecimal(columnOrdinal);
} else if (dataType instanceof StructType) {
final io.delta.kernel.data.Row structRow = dataRow.getStruct(columnOrdinal);
return RowSerde.convertRowToJsonObject(structRow);

View File

@ -32,6 +32,7 @@ import io.delta.kernel.types.BooleanType;
import io.delta.kernel.types.ByteType;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.DateType;
import io.delta.kernel.types.DecimalType;
import io.delta.kernel.types.DoubleType;
import io.delta.kernel.types.FloatType;
import io.delta.kernel.types.IntegerType;
@ -126,6 +127,8 @@ public class RowSerde
value = row.getFloat(fieldId);
} else if (fieldType instanceof DoubleType) {
value = row.getDouble(fieldId);
} else if (fieldType instanceof DecimalType) {
value = row.getDecimal(fieldId);
} else if (fieldType instanceof DateType) {
value = DeltaTimeUtils.getSecondsFromDate(row.getInt(fieldId));
} else if (fieldType instanceof TimestampType) {

View File

@ -28,6 +28,7 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.AutoTypeColumnSchema;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -66,35 +67,35 @@ public class ComplexTypesDeltaTable
"id", 0L,
"array_info", ImmutableList.of(0, 1, 2, 3),
"struct_info", ImmutableMap.of("id", 0L, "name", "0"),
"nested_struct_info", ImmutableMap.of("id", 0L, "name", "0", "nested", ImmutableMap.of("nested_int", 0, "nested_double", 1.0)),
"nested_struct_info", ImmutableMap.of("id", 0L, "name", "0", "nested", ImmutableMap.of("nested_int", 0, "nested_double", 1.0, "nested_decimal", BigDecimal.valueOf(0.23))),
"map_info", ImmutableMap.of("key1", 1.0f, "key2", 1.0f)
),
ImmutableMap.of(
"id", 1L,
"array_info", ImmutableList.of(1, 2, 3, 4),
"struct_info", ImmutableMap.of("id", 1L, "name", "1"),
"nested_struct_info", ImmutableMap.of("id", 1L, "name", "1", "nested", ImmutableMap.of("nested_int", 1, "nested_double", 2.0)),
"nested_struct_info", ImmutableMap.of("id", 1L, "name", "1", "nested", ImmutableMap.of("nested_int", 1, "nested_double", 2.0, "nested_decimal", BigDecimal.valueOf(1.23))),
"map_info", ImmutableMap.of("key1", 2.0f, "key2", 2.0f)
),
ImmutableMap.of(
"id", 2L,
"array_info", ImmutableList.of(2, 3, 4, 5),
"struct_info", ImmutableMap.of("id", 2L, "name", "2"),
"nested_struct_info", ImmutableMap.of("id", 2L, "name", "2", "nested", ImmutableMap.of("nested_int", 2, "nested_double", 3.0)),
"nested_struct_info", ImmutableMap.of("id", 2L, "name", "2", "nested", ImmutableMap.of("nested_int", 2, "nested_double", 3.0, "nested_decimal", BigDecimal.valueOf(2.23))),
"map_info", ImmutableMap.of("key1", 3.0f, "key2", 3.0f)
),
ImmutableMap.of(
"id", 3L,
"array_info", ImmutableList.of(3, 4, 5, 6),
"struct_info", ImmutableMap.of("id", 3L, "name", "3"),
"nested_struct_info", ImmutableMap.of("id", 3L, "name", "3", "nested", ImmutableMap.of("nested_int", 3, "nested_double", 4.0)),
"nested_struct_info", ImmutableMap.of("id", 3L, "name", "3", "nested", ImmutableMap.of("nested_int", 3, "nested_double", 4.0, "nested_decimal", BigDecimal.valueOf(3.23))),
"map_info", ImmutableMap.of("key1", 4.0f, "key2", 4.0f)
),
ImmutableMap.of(
"id", 4L,
"array_info", ImmutableList.of(4, 5, 6, 7),
"struct_info", ImmutableMap.of("id", 4L, "name", "4"),
"nested_struct_info", ImmutableMap.of("id", 4L, "name", "4", "nested", ImmutableMap.of("nested_int", 4, "nested_double", 5.0)),
"nested_struct_info", ImmutableMap.of("id", 4L, "name", "4", "nested", ImmutableMap.of("nested_int", 4, "nested_double", 5.0, "nested_decimal", BigDecimal.valueOf(4.23))),
"map_info", ImmutableMap.of("key1", 5.0f, "key2", 5.0f)
)
)

View File

@ -1,8 +1,8 @@
{"commitInfo":{"timestamp":1723511561738,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"6","numOutputRows":"5","numOutputBytes":"17937"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0","txnId":"b9eae5f4-d55b-4c38-b365-8228ec09248e"}}
{"metaData":{"id":"ce998219-9bde-4831-b78c-14b11f919fbe","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"array_info\",\"type\":{\"type\":\"array\",\"elementType\":\"integer\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"struct_info\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"nested_struct_info\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"nested\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"nested_int\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"nested_double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"map_info\",\"type\":{\"type\":\"map\",\"keyType\":\"string\",\"valueType\":\"float\",\"valueContainsNull\":true},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1723511559184}}
{"commitInfo":{"timestamp":1729202194177,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"6","numOutputRows":"5","numOutputBytes":"19553"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0","txnId":"ff724e89-210e-47dd-a31f-edec6f2c6f6f"}}
{"metaData":{"id":"68614ef0-e5ca-4caa-bbd9-b3427eecadc9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"array_info\",\"type\":{\"type\":\"array\",\"elementType\":\"integer\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"struct_info\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"nested_struct_info\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"nested\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"nested_int\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"nested_double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"nested_decimal\",\"type\":\"decimal(4,2)\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"map_info\",\"type\":{\"type\":\"map\",\"keyType\":\"string\",\"valueType\":\"float\",\"valueContainsNull\":true},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1729202191427}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"add":{"path":"part-00001-01efecb8-5771-4e91-834e-2a1cb6601eb8-c000.snappy.parquet","partitionValues":{},"size":3288,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":0,\"struct_info\":{\"id\":0,\"name\":\"0\"},\"nested_struct_info\":{\"id\":0,\"name\":\"0\",\"nested\":{\"nested_int\":0,\"nested_double\":1.0}}},\"maxValues\":{\"id\":0,\"struct_info\":{\"id\":0,\"name\":\"0\"},\"nested_struct_info\":{\"id\":0,\"name\":\"0\",\"nested\":{\"nested_int\":0,\"nested_double\":1.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}}
{"add":{"path":"part-00003-383f5a97-c624-4ef3-82a4-f3f273308e53-c000.snappy.parquet","partitionValues":{},"size":3291,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1,\"struct_info\":{\"id\":1,\"name\":\"1\"},\"nested_struct_info\":{\"id\":1,\"name\":\"1\",\"nested\":{\"nested_int\":1,\"nested_double\":2.0}}},\"maxValues\":{\"id\":1,\"struct_info\":{\"id\":1,\"name\":\"1\"},\"nested_struct_info\":{\"id\":1,\"name\":\"1\",\"nested\":{\"nested_int\":1,\"nested_double\":2.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}}
{"add":{"path":"part-00005-febee455-5e89-404a-bb38-f627c47eb20b-c000.snappy.parquet","partitionValues":{},"size":3289,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2,\"struct_info\":{\"id\":2,\"name\":\"2\"},\"nested_struct_info\":{\"id\":2,\"name\":\"2\",\"nested\":{\"nested_int\":2,\"nested_double\":3.0}}},\"maxValues\":{\"id\":2,\"struct_info\":{\"id\":2,\"name\":\"2\"},\"nested_struct_info\":{\"id\":2,\"name\":\"2\",\"nested\":{\"nested_int\":2,\"nested_double\":3.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}}
{"add":{"path":"part-00007-07d88387-16f9-4141-bc77-0106e7f28f7a-c000.snappy.parquet","partitionValues":{},"size":3290,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":3,\"struct_info\":{\"id\":3,\"name\":\"3\"},\"nested_struct_info\":{\"id\":3,\"name\":\"3\",\"nested\":{\"nested_int\":3,\"nested_double\":4.0}}},\"maxValues\":{\"id\":3,\"struct_info\":{\"id\":3,\"name\":\"3\"},\"nested_struct_info\":{\"id\":3,\"name\":\"3\",\"nested\":{\"nested_int\":3,\"nested_double\":4.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}}
{"add":{"path":"part-00009-73760316-7ace-43fe-b605-506c942cd969-c000.snappy.parquet","partitionValues":{},"size":3291,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4,\"struct_info\":{\"id\":4,\"name\":\"4\"},\"nested_struct_info\":{\"id\":4,\"name\":\"4\",\"nested\":{\"nested_int\":4,\"nested_double\":5.0}}},\"maxValues\":{\"id\":4,\"struct_info\":{\"id\":4,\"name\":\"4\"},\"nested_struct_info\":{\"id\":4,\"name\":\"4\",\"nested\":{\"nested_int\":4,\"nested_double\":5.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}}
{"add":{"path":"part-00001-72cd58ed-7979-4b19-bce2-303feb8f9c66-c000.snappy.parquet","partitionValues":{},"size":3588,"modificationTime":1729202194127,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":0,\"struct_info\":{\"id\":0,\"name\":\"0\"},\"nested_struct_info\":{\"id\":0,\"name\":\"0\",\"nested\":{\"nested_int\":0,\"nested_double\":1.0,\"nested_decimal\":0.23}}},\"maxValues\":{\"id\":0,\"struct_info\":{\"id\":0,\"name\":\"0\"},\"nested_struct_info\":{\"id\":0,\"name\":\"0\",\"nested\":{\"nested_int\":0,\"nested_double\":1.0,\"nested_decimal\":0.23}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0,\"nested_decimal\":0}},\"map_info\":0}}"}}
{"add":{"path":"part-00003-0a39e5f0-e30e-4b08-abf3-e4f79a9062f5-c000.snappy.parquet","partitionValues":{},"size":3591,"modificationTime":1729202194127,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1,\"struct_info\":{\"id\":1,\"name\":\"1\"},\"nested_struct_info\":{\"id\":1,\"name\":\"1\",\"nested\":{\"nested_int\":1,\"nested_double\":2.0,\"nested_decimal\":1.23}}},\"maxValues\":{\"id\":1,\"struct_info\":{\"id\":1,\"name\":\"1\"},\"nested_struct_info\":{\"id\":1,\"name\":\"1\",\"nested\":{\"nested_int\":1,\"nested_double\":2.0,\"nested_decimal\":1.23}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0,\"nested_decimal\":0}},\"map_info\":0}}"}}
{"add":{"path":"part-00005-2f71a6be-840a-427d-ad22-7036bb052433-c000.snappy.parquet","partitionValues":{},"size":3589,"modificationTime":1729202194127,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2,\"struct_info\":{\"id\":2,\"name\":\"2\"},\"nested_struct_info\":{\"id\":2,\"name\":\"2\",\"nested\":{\"nested_int\":2,\"nested_double\":3.0,\"nested_decimal\":2.23}}},\"maxValues\":{\"id\":2,\"struct_info\":{\"id\":2,\"name\":\"2\"},\"nested_struct_info\":{\"id\":2,\"name\":\"2\",\"nested\":{\"nested_int\":2,\"nested_double\":3.0,\"nested_decimal\":2.23}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0,\"nested_decimal\":0}},\"map_info\":0}}"}}
{"add":{"path":"part-00007-35186f7d-8e26-4b92-a61f-b9a3e5d8b986-c000.snappy.parquet","partitionValues":{},"size":3590,"modificationTime":1729202194127,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":3,\"struct_info\":{\"id\":3,\"name\":\"3\"},\"nested_struct_info\":{\"id\":3,\"name\":\"3\",\"nested\":{\"nested_int\":3,\"nested_double\":4.0,\"nested_decimal\":3.23}}},\"maxValues\":{\"id\":3,\"struct_info\":{\"id\":3,\"name\":\"3\"},\"nested_struct_info\":{\"id\":3,\"name\":\"3\",\"nested\":{\"nested_int\":3,\"nested_double\":4.0,\"nested_decimal\":3.23}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0,\"nested_decimal\":0}},\"map_info\":0}}"}}
{"add":{"path":"part-00009-cecabfb6-53c7-4526-abc6-69042b415d87-c000.snappy.parquet","partitionValues":{},"size":3591,"modificationTime":1729202194127,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4,\"struct_info\":{\"id\":4,\"name\":\"4\"},\"nested_struct_info\":{\"id\":4,\"name\":\"4\",\"nested\":{\"nested_int\":4,\"nested_double\":5.0,\"nested_decimal\":4.23}}},\"maxValues\":{\"id\":4,\"struct_info\":{\"id\":4,\"name\":\"4\"},\"nested_struct_info\":{\"id\":4,\"name\":\"4\",\"nested\":{\"nested_int\":4,\"nested_double\":5.0,\"nested_decimal\":4.23}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0,\"nested_decimal\":0}},\"map_info\":0}}"}}

View File

@ -17,10 +17,10 @@
import argparse
from enum import Enum
from decimal import Decimal
from delta import *
import pyspark
from pyspark.sql.types import MapType, StructType, StructField, ShortType, StringType, TimestampType, LongType, IntegerType, DoubleType, FloatType, DateType, BooleanType, ArrayType
from pyspark.sql.types import MapType, StructType, StructField, ShortType, StringType, TimestampType, LongType, IntegerType, DoubleType, FloatType, DateType, BooleanType, ArrayType, DecimalType
from pyspark.sql.functions import expr
from datetime import datetime, timedelta
import random
@ -73,6 +73,7 @@ def create_dataset_with_complex_types(num_records):
StructField("nested", StructType([
StructField("nested_int", IntegerType(), False),
StructField("nested_double", DoubleType(), True),
StructField("nested_decimal", DecimalType(4, 2), True),
]))
])),
StructField("map_info", MapType(StringType(), FloatType()))
@ -85,7 +86,7 @@ def create_dataset_with_complex_types(num_records):
idx,
(idx, idx + 1, idx + 2, idx + 3),
(idx, f"{idx}"),
(idx, f"{idx}", (idx, idx + 1.0)),
(idx, f"{idx}", (idx, idx + 1.0, Decimal(idx + 0.23))),
{"key1": idx + 1.0, "key2": idx + 1.0}
)
data.append(record)