From aa833a711c516e17305bbe585d32ac26c0d314fd Mon Sep 17 00:00:00 2001 From: Abhishek Radhakrishnan Date: Mon, 9 Sep 2024 04:42:48 -0400 Subject: [PATCH] Support for reading Delta Lake table snapshots (#17004) Problem Currently, the delta input source only supports reading from the latest snapshot of the given Delta Lake table. This is a known documented limitation. Description Add support for reading Delta snapshot. By default, the Druid-Delta connector reads the latest snapshot of the Delta table in order to preserve compatibility. Users can specify a snapshotVersion to ingest change data events from Delta tables into Druid. In the future, we can also add support for time-based snapshot reads. The Delta API to read time-based snapshots is not clear currently. --- .../extensions-contrib/delta-lake.md | 7 +- docs/ingestion/input-sources.md | 19 +-- .../druid/delta/input/DeltaInputSource.java | 45 ++++-- .../druid/delta/input/DeltaInputRowTest.java | 5 +- .../input/DeltaInputSourceSerdeTest.java | 14 ++ .../delta/input/DeltaInputSourceTest.java | 68 ++++++++- .../druid/delta/input/RowSerdeTest.java | 4 +- .../druid/delta/input/SnapshotDeltaTable.java | 129 ++++++++++++++++++ .../src/test/resources/README.md | 23 +++- .../src/test/resources/create_delta_table.py | 92 +++++++++++-- .../_delta_log/.00000000000000000000.json.crc | Bin 0 -> 24 bytes .../_delta_log/.00000000000000000001.json.crc | Bin 0 -> 16 bytes .../_delta_log/.00000000000000000002.json.crc | Bin 0 -> 20 bytes .../_delta_log/.00000000000000000003.json.crc | Bin 0 -> 16 bytes .../_delta_log/00000000000000000000.json | 6 + .../_delta_log/00000000000000000001.json | 2 + .../_delta_log/00000000000000000002.json | 3 + .../_delta_log/00000000000000000003.json | 3 + ...-a5a8-516e5b35ef44.c000.snappy.parquet.crc | Bin 0 -> 16 bytes ...4856-a5a8-516e5b35ef44.c000.snappy.parquet | Bin 0 -> 869 bytes ...-92a8-92ec51e4bdb9.c000.snappy.parquet.crc | Bin 0 -> 16 bytes ...-81aa-7bc8140b0f09.c000.snappy.parquet.crc | Bin 0 -> 16 bytes ...4603-92a8-92ec51e4bdb9.c000.snappy.parquet | Bin 0 -> 869 bytes ...4a2e-81aa-7bc8140b0f09.c000.snappy.parquet | Bin 0 -> 869 bytes ...-b2ca-db8d7288d345.c000.snappy.parquet.crc | Bin 0 -> 16 bytes ...-b4f1-ab0c2e762044.c000.snappy.parquet.crc | Bin 0 -> 16 bytes ...4c46-b2ca-db8d7288d345.c000.snappy.parquet | Bin 0 -> 869 bytes ...446c-b4f1-ab0c2e762044.c000.snappy.parquet | Bin 0 -> 869 bytes ...-8d6c-dd90dd3db251.c000.snappy.parquet.crc | Bin 0 -> 16 bytes ...4b3d-8d6c-dd90dd3db251.c000.snappy.parquet | Bin 0 -> 869 bytes website/.spelling | 1 + 31 files changed, 364 insertions(+), 57 deletions(-) create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/SnapshotDeltaTable.java create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/_delta_log/.00000000000000000000.json.crc create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/_delta_log/.00000000000000000001.json.crc create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/_delta_log/.00000000000000000002.json.crc create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/_delta_log/.00000000000000000003.json.crc create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/_delta_log/00000000000000000000.json create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/_delta_log/00000000000000000001.json create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/_delta_log/00000000000000000002.json create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/_delta_log/00000000000000000003.json create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/id=0/.part-00003-8610110f-f5a0-4856-a5a8-516e5b35ef44.c000.snappy.parquet.crc create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/id=0/part-00003-8610110f-f5a0-4856-a5a8-516e5b35ef44.c000.snappy.parquet create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/id=1/.part-00004-a614b691-2e73-4603-92a8-92ec51e4bdb9.c000.snappy.parquet.crc create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/id=1/.part-00006-120df0a3-1c7a-4a2e-81aa-7bc8140b0f09.c000.snappy.parquet.crc create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/id=1/part-00004-a614b691-2e73-4603-92a8-92ec51e4bdb9.c000.snappy.parquet create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/id=1/part-00006-120df0a3-1c7a-4a2e-81aa-7bc8140b0f09.c000.snappy.parquet create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/id=2/.part-00000-bffba8b9-3388-4c46-b2ca-db8d7288d345.c000.snappy.parquet.crc create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/id=2/.part-00009-246861b8-01b0-446c-b4f1-ab0c2e762044.c000.snappy.parquet.crc create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/id=2/part-00000-bffba8b9-3388-4c46-b2ca-db8d7288d345.c000.snappy.parquet create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/id=2/part-00009-246861b8-01b0-446c-b4f1-ab0c2e762044.c000.snappy.parquet create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/id=4/.part-00009-d3e45f07-7843-4b3d-8d6c-dd90dd3db251.c000.snappy.parquet.crc create mode 100644 extensions-contrib/druid-deltalake-extensions/src/test/resources/snapshot-table/id=4/part-00009-d3e45f07-7843-4b3d-8d6c-dd90dd3db251.c000.snappy.parquet diff --git a/docs/development/extensions-contrib/delta-lake.md b/docs/development/extensions-contrib/delta-lake.md index 503fbfdc55d..88f3a2c77f3 100644 --- a/docs/development/extensions-contrib/delta-lake.md +++ b/docs/development/extensions-contrib/delta-lake.md @@ -51,9 +51,4 @@ java \ -c "org.apache.druid.extensions.contrib:druid-deltalake-extensions:" ``` -See [Loading community extensions](../../configuration/extensions.md#loading-community-extensions) for more information. - -## Known limitations - -This extension relies on the Delta Kernel API and can only read from the latest Delta table snapshot. Ability to read from -arbitrary snapshots is tracked [here](https://github.com/delta-io/delta/issues/2581). \ No newline at end of file +See [Loading community extensions](../../configuration/extensions.md#loading-community-extensions) for more information. \ No newline at end of file diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md index c6ba2e5b49e..71340abc2c0 100644 --- a/docs/ingestion/input-sources.md +++ b/docs/ingestion/input-sources.md @@ -1147,11 +1147,12 @@ To use the Delta Lake input source, load the extension [`druid-deltalake-extensi You can use the Delta input source to read data stored in a Delta Lake table. For a given table, the input source scans the latest snapshot from the configured table. Druid ingests the underlying delta files from the table. -| Property|Description|Required| -|---------|-----------|--------| -| type|Set this value to `delta`.|yes| -| tablePath|The location of the Delta table.|yes| -| filter|The JSON Object that filters data files within a snapshot.|no| +| Property|Description| Default|Required | +|---------|-----------|-----------------| +|type|Set this value to `delta`.| None|yes| +|tablePath|The location of the Delta table.|None|yes| +|filter|The JSON Object that filters data files within a snapshot.|None|no| +|snapshotVersion|The snapshot version to read from the Delta table. An integer value must be specified.|Latest|no| ### Delta filter object @@ -1224,7 +1225,7 @@ filters on partitioned columns. | column | The table column to apply the filter on. | yes | | value | The value to use in the filter. | yes | -The following is a sample spec to read all records from the Delta table `/delta-table/foo`: +The following is a sample spec to read all records from the latest snapshot from Delta table `/delta-table/foo`: ```json ... @@ -1237,7 +1238,8 @@ The following is a sample spec to read all records from the Delta table `/delta- } ``` -The following is a sample spec to read records from the Delta table `/delta-table/foo` to select records where `name = 'Employee4' and age >= 30`: +The following is a sample spec to read records from the Delta table `/delta-table/foo` snapshot version `3` to select records where +`name = 'Employee4' and age >= 30`: ```json ... @@ -1260,7 +1262,8 @@ The following is a sample spec to read records from the Delta table `/delta-tabl "value": "30" } ] - } + }, + "snapshotVersion": 3 }, } ``` diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java index 01a18e9bc85..c4c2f2668b0 100644 --- a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java +++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java @@ -67,9 +67,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; /** - * Input source to ingest data from a Delta Lake. This input source reads the latest snapshot from a Delta table - * specified by {@code tablePath} parameter. If {@code filter} is specified, it's used at the Kernel level - * for data pruning. The filtering behavior is as follows: + * Input source to ingest data from a Delta Lake. This input source reads the given {@code snapshotVersion} from a Delta + * table specified by {@code tablePath} parameter, or the latest snapshot if it's not specified. + * If {@code filter} is specified, it's used at the Kernel level for data pruning. The filtering behavior is as follows: *