Handle Delta StructType, ArrayType and MapType (#16884)

Handle the following Delta complex types:
a. StructType as JSON
b. ArrayType as Java list
c. MapType as Java map

Generate and add a new Delta table complex-types-table that contains the above complex types for testing.

Update the tests to include a parameterized test with complex-types-table, with the expectations defined in ComplexTypesDeltaTable.java.
This commit is contained in:
Abhishek Radhakrishnan 2024-08-13 07:50:03 -07:00 committed by GitHub
parent c6da2f30e8
commit acadc2df20
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 235 additions and 6 deletions

View File

@ -19,6 +19,10 @@
package org.apache.druid.delta.input;
import io.delta.kernel.data.ArrayValue;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.types.ArrayType;
import io.delta.kernel.types.BinaryType;
import io.delta.kernel.types.BooleanType;
import io.delta.kernel.types.ByteType;
@ -29,6 +33,7 @@ import io.delta.kernel.types.DoubleType;
import io.delta.kernel.types.FloatType;
import io.delta.kernel.types.IntegerType;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.MapType;
import io.delta.kernel.types.ShortType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
@ -197,6 +202,15 @@ public class DeltaInputRow implements InputRow
return String.valueOf(charArray);
} else if (dataType instanceof DecimalType) {
return dataRow.getDecimal(columnOrdinal).longValue();
} else if (dataType instanceof StructType) {
final io.delta.kernel.data.Row structRow = dataRow.getStruct(columnOrdinal);
return RowSerde.convertRowToJsonObject(structRow);
} else if (dataType instanceof ArrayType) {
final ArrayValue arrayRow = dataRow.getArray(columnOrdinal);
return VectorUtils.toJavaList(arrayRow);
} else if (dataType instanceof MapType) {
final MapValue map = dataRow.getMap(columnOrdinal);
return VectorUtils.toJavaMap(map);
} else {
throw InvalidInput.exception(
"Unsupported data type[%s] for fieldName[%s].",

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.input;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.AutoTypeColumnSchema;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Refer to extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md to generate the
* sample complex types Delta Lake table used in the unit tests.
*
*/
public class ComplexTypesDeltaTable
{
/**
* The Delta table path used by unit tests.
*/
public static final String DELTA_TABLE_PATH = "src/test/resources/complex-types-table";
/**
* The list of dimensions in the Delta table {@link #DELTA_TABLE_PATH}.
*/
public static final List<String> DIMENSIONS = ImmutableList.of(
"id",
"array_info",
"struct_info",
"nested_struct_info",
"map_info"
);
/**
* The expected set of rows from the first checkpoint file {@code {@link #DELTA_TABLE_PATH}/_delta_log/00000000000000000000.json}
*/
private static final List<Map<String, Object>> SPLIT_0_EXPECTED_ROWS = new ArrayList<>(
ImmutableList.of(
ImmutableMap.of(
"id", 0L,
"array_info", ImmutableList.of(0, 1, 2, 3),
"struct_info", ImmutableMap.of("id", 0L, "name", "0"),
"nested_struct_info", ImmutableMap.of("id", 0L, "name", "0", "nested", ImmutableMap.of("nested_int", 0, "nested_double", 1.0)),
"map_info", ImmutableMap.of("key1", 1.0f, "key2", 1.0f)
),
ImmutableMap.of(
"id", 1L,
"array_info", ImmutableList.of(1, 2, 3, 4),
"struct_info", ImmutableMap.of("id", 1L, "name", "1"),
"nested_struct_info", ImmutableMap.of("id", 1L, "name", "1", "nested", ImmutableMap.of("nested_int", 1, "nested_double", 2.0)),
"map_info", ImmutableMap.of("key1", 2.0f, "key2", 2.0f)
),
ImmutableMap.of(
"id", 2L,
"array_info", ImmutableList.of(2, 3, 4, 5),
"struct_info", ImmutableMap.of("id", 2L, "name", "2"),
"nested_struct_info", ImmutableMap.of("id", 2L, "name", "2", "nested", ImmutableMap.of("nested_int", 2, "nested_double", 3.0)),
"map_info", ImmutableMap.of("key1", 3.0f, "key2", 3.0f)
),
ImmutableMap.of(
"id", 3L,
"array_info", ImmutableList.of(3, 4, 5, 6),
"struct_info", ImmutableMap.of("id", 3L, "name", "3"),
"nested_struct_info", ImmutableMap.of("id", 3L, "name", "3", "nested", ImmutableMap.of("nested_int", 3, "nested_double", 4.0)),
"map_info", ImmutableMap.of("key1", 4.0f, "key2", 4.0f)
),
ImmutableMap.of(
"id", 4L,
"array_info", ImmutableList.of(4, 5, 6, 7),
"struct_info", ImmutableMap.of("id", 4L, "name", "4"),
"nested_struct_info", ImmutableMap.of("id", 4L, "name", "4", "nested", ImmutableMap.of("nested_int", 4, "nested_double", 5.0)),
"map_info", ImmutableMap.of("key1", 5.0f, "key2", 5.0f)
)
)
);
/**
* Mapping of checkpoint file identifier to the list of expected rows in that checkpoint.
*/
public static final Map<Integer, List<Map<String, Object>>> SPLIT_TO_EXPECTED_ROWS = new HashMap<>(
ImmutableMap.of(
0, SPLIT_0_EXPECTED_ROWS
)
);
/**
* Complete set of expected rows across all checkpoint files for {@link #DELTA_TABLE_PATH}.
*/
public static final List<Map<String, Object>> EXPECTED_ROWS = SPLIT_TO_EXPECTED_ROWS.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
/**
* The Druid schema used for ingestion of {@link #DELTA_TABLE_PATH}.
*/
public static final InputRowSchema FULL_SCHEMA = new InputRowSchema(
new TimestampSpec("na", "posix", DateTimes.of("2024-01-01")),
new DimensionsSpec(
ImmutableList.of(
new AutoTypeColumnSchema("id", null),
new AutoTypeColumnSchema("array_info", null),
new AutoTypeColumnSchema("struct_info", null),
new AutoTypeColumnSchema("nested_struct_info", null),
new AutoTypeColumnSchema("map_info", null)
)
),
ColumnsFilter.all()
);
}

View File

@ -54,7 +54,8 @@ public class DeltaInputRowTest
{
Object[][] data = new Object[][]{
{NonPartitionedDeltaTable.DELTA_TABLE_PATH, NonPartitionedDeltaTable.FULL_SCHEMA, NonPartitionedDeltaTable.DIMENSIONS, NonPartitionedDeltaTable.EXPECTED_ROWS},
{PartitionedDeltaTable.DELTA_TABLE_PATH, PartitionedDeltaTable.FULL_SCHEMA, PartitionedDeltaTable.DIMENSIONS, PartitionedDeltaTable.EXPECTED_ROWS}
{PartitionedDeltaTable.DELTA_TABLE_PATH, PartitionedDeltaTable.FULL_SCHEMA, PartitionedDeltaTable.DIMENSIONS, PartitionedDeltaTable.EXPECTED_ROWS},
{ComplexTypesDeltaTable.DELTA_TABLE_PATH, ComplexTypesDeltaTable.FULL_SCHEMA, ComplexTypesDeltaTable.DIMENSIONS, ComplexTypesDeltaTable.EXPECTED_ROWS}
};
return Arrays.asList(data);
}
@ -116,7 +117,7 @@ public class DeltaInputRowTest
}
}
}
Assert.assertEquals(NonPartitionedDeltaTable.EXPECTED_ROWS.size(), totalRecordCount);
Assert.assertEquals(expectedRows.size(), totalRecordCount);
}
@MethodSource("data")

View File

@ -84,6 +84,11 @@ public class DeltaInputSourceTest
PartitionedDeltaTable.DELTA_TABLE_PATH,
PartitionedDeltaTable.FULL_SCHEMA,
PartitionedDeltaTable.EXPECTED_ROWS
},
{
ComplexTypesDeltaTable.DELTA_TABLE_PATH,
ComplexTypesDeltaTable.FULL_SCHEMA,
ComplexTypesDeltaTable.EXPECTED_ROWS
}
};
}

View File

@ -84,3 +84,14 @@ python3 create_delta_table.py --save_path=employee-delta-table-partitioned-name
The resulting Delta table is checked in to the repo. The expectated rows to be used in tests are updated in
`PartitionedDeltaTable.java` accordingly.
### Complex types table `complex-types-table`:
The test data in `resources/complex-types-table` contains 5 Delta records generated with 1 snapshot.
The table was generated by running the following commands:
```shell
python3 create_delta_table.py --save_path=complex-types-table --num_records=5 --gen_complex_types=True
```
The resulting Delta table is checked in to the repo. The expectated rows to be used in tests are updated in
`ComplexTypesDeltaTable.java` accordingly.

View File

@ -0,0 +1,8 @@
{"commitInfo":{"timestamp":1723511561738,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"6","numOutputRows":"5","numOutputBytes":"17937"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0","txnId":"b9eae5f4-d55b-4c38-b365-8228ec09248e"}}
{"metaData":{"id":"ce998219-9bde-4831-b78c-14b11f919fbe","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"array_info\",\"type\":{\"type\":\"array\",\"elementType\":\"integer\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"struct_info\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"nested_struct_info\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"nested\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"nested_int\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"nested_double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"map_info\",\"type\":{\"type\":\"map\",\"keyType\":\"string\",\"valueType\":\"float\",\"valueContainsNull\":true},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1723511559184}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"add":{"path":"part-00001-01efecb8-5771-4e91-834e-2a1cb6601eb8-c000.snappy.parquet","partitionValues":{},"size":3288,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":0,\"struct_info\":{\"id\":0,\"name\":\"0\"},\"nested_struct_info\":{\"id\":0,\"name\":\"0\",\"nested\":{\"nested_int\":0,\"nested_double\":1.0}}},\"maxValues\":{\"id\":0,\"struct_info\":{\"id\":0,\"name\":\"0\"},\"nested_struct_info\":{\"id\":0,\"name\":\"0\",\"nested\":{\"nested_int\":0,\"nested_double\":1.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}}
{"add":{"path":"part-00003-383f5a97-c624-4ef3-82a4-f3f273308e53-c000.snappy.parquet","partitionValues":{},"size":3291,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1,\"struct_info\":{\"id\":1,\"name\":\"1\"},\"nested_struct_info\":{\"id\":1,\"name\":\"1\",\"nested\":{\"nested_int\":1,\"nested_double\":2.0}}},\"maxValues\":{\"id\":1,\"struct_info\":{\"id\":1,\"name\":\"1\"},\"nested_struct_info\":{\"id\":1,\"name\":\"1\",\"nested\":{\"nested_int\":1,\"nested_double\":2.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}}
{"add":{"path":"part-00005-febee455-5e89-404a-bb38-f627c47eb20b-c000.snappy.parquet","partitionValues":{},"size":3289,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2,\"struct_info\":{\"id\":2,\"name\":\"2\"},\"nested_struct_info\":{\"id\":2,\"name\":\"2\",\"nested\":{\"nested_int\":2,\"nested_double\":3.0}}},\"maxValues\":{\"id\":2,\"struct_info\":{\"id\":2,\"name\":\"2\"},\"nested_struct_info\":{\"id\":2,\"name\":\"2\",\"nested\":{\"nested_int\":2,\"nested_double\":3.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}}
{"add":{"path":"part-00007-07d88387-16f9-4141-bc77-0106e7f28f7a-c000.snappy.parquet","partitionValues":{},"size":3290,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":3,\"struct_info\":{\"id\":3,\"name\":\"3\"},\"nested_struct_info\":{\"id\":3,\"name\":\"3\",\"nested\":{\"nested_int\":3,\"nested_double\":4.0}}},\"maxValues\":{\"id\":3,\"struct_info\":{\"id\":3,\"name\":\"3\"},\"nested_struct_info\":{\"id\":3,\"name\":\"3\",\"nested\":{\"nested_int\":3,\"nested_double\":4.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}}
{"add":{"path":"part-00009-73760316-7ace-43fe-b605-506c942cd969-c000.snappy.parquet","partitionValues":{},"size":3291,"modificationTime":1723511561689,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4,\"struct_info\":{\"id\":4,\"name\":\"4\"},\"nested_struct_info\":{\"id\":4,\"name\":\"4\",\"nested\":{\"nested_int\":4,\"nested_double\":5.0}}},\"maxValues\":{\"id\":4,\"struct_info\":{\"id\":4,\"name\":\"4\"},\"nested_struct_info\":{\"id\":4,\"name\":\"4\",\"nested\":{\"nested_int\":4,\"nested_double\":5.0}}},\"nullCount\":{\"id\":0,\"array_info\":0,\"struct_info\":{\"id\":0,\"name\":0},\"nested_struct_info\":{\"id\":0,\"name\":0,\"nested\":{\"nested_int\":0,\"nested_double\":0}},\"map_info\":0}}"}}

View File

@ -15,12 +15,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import argparse
from delta import *
import pyspark
from pyspark.sql.types import StructType, StructField, ShortType, StringType, TimestampType, LongType, IntegerType, DoubleType, FloatType, DateType, BooleanType
from pyspark.sql.types import MapType, StructType, StructField, ShortType, StringType, TimestampType, LongType, IntegerType, DoubleType, FloatType, DateType, BooleanType, ArrayType
from datetime import datetime, timedelta
import random
@ -39,6 +37,55 @@ def config_spark_with_delta_lake():
return spark
def create_dataset_with_complex_types(num_records):
"""
Create a mock dataset with records containing complex types like arrays, structs and maps.
Parameters:
- num_records (int): Number of records to generate.
Returns:
- Tuple: A tuple containing a list of records and the corresponding schema.
- List of Records: Each record is a tuple representing a row of data.
- StructType: The schema defining the structure of the records.
Example:
```python
data, schema = create_dataset_with_complex_types(10)
```
"""
schema = StructType([
StructField("id", LongType(), False),
StructField("array_info", ArrayType(IntegerType(), True), True),
StructField("struct_info", StructType([
StructField("id", LongType(), False),
StructField("name", StringType(), True)
])),
StructField("nested_struct_info", StructType([
StructField("id", LongType(), False),
StructField("name", StringType(), True),
StructField("nested", StructType([
StructField("nested_int", IntegerType(), False),
StructField("nested_double", DoubleType(), True),
]))
])),
StructField("map_info", MapType(StringType(), FloatType()))
])
data = []
for idx in range(num_records):
record = (
idx,
(idx, idx + 1, idx + 2, idx + 3),
(idx, f"{idx}"),
(idx, f"{idx}", (idx, idx + 1.0)),
{"key1": idx + 1.0, "key2": idx + 1.0}
)
data.append(record)
return data, schema
def create_dataset(num_records):
"""
Generate a mock employee dataset with different datatypes for testing purposes.
@ -94,6 +141,9 @@ def main():
parser = argparse.ArgumentParser(description="Script to write a Delta Lake table.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--gen_complex_types", type=bool, default=False, help="Generate a Delta table with records"
" containing complex types like structs,"
" maps and arrays.")
parser.add_argument('--save_path', default=None, required=True, help="Save path for Delta table")
parser.add_argument('--save_mode', choices=('append', 'overwrite'), default="append",
help="Specify write mode (append/overwrite)")
@ -103,6 +153,7 @@ def main():
args = parser.parse_args()
is_gen_complex_types = args.gen_complex_types
save_mode = args.save_mode
save_path = args.save_path
num_records = args.num_records
@ -110,7 +161,11 @@ def main():
spark = config_spark_with_delta_lake()
data, schema = create_dataset(num_records=num_records)
if is_gen_complex_types:
data, schema = create_dataset_with_complex_types(num_records=num_records)
else:
data, schema = create_dataset(num_records=num_records)
df = spark.createDataFrame(data, schema=schema)
if not partitioned_by:
df.write.format("delta").mode(save_mode).save(save_path)