From 75937c98e82156d8812143a047baf3f2a8af6b72 Mon Sep 17 00:00:00 2001 From: Abhishek Radhakrishnan Date: Wed, 29 May 2024 10:46:30 -0700 Subject: [PATCH] Upgrade delta kernel from 3.1.0 to 3.2.0 (#16513) Upstream release: https://github.com/delta-io/delta/releases/tag/v3.2.0 - Upgrade kernel dependency to 3.2.0 - Notable breaking changes introduced in upstream that affects the Druid extension: - Rename TableClient -> Engine - Rename DefaultTableClient -> DefaultEngine - Exceptions moved to a separate package - Table.getPath() doesn't throw TableNotFoundException. Instead the exception is thrown when getting snapshot info from the Table object --- .../druid-deltalake-extensions/pom.xml | 2 +- .../druid/delta/input/DeltaInputSource.java | 74 +++++++++---------- .../apache/druid/delta/input/RowSerde.java | 7 +- .../druid/delta/input/DeltaInputRowTest.java | 20 ++--- .../druid/delta/input/DeltaTestUtils.java | 16 ++-- .../druid/delta/input/RowSerdeTest.java | 16 ++-- 6 files changed, 67 insertions(+), 68 deletions(-) diff --git a/extensions-contrib/druid-deltalake-extensions/pom.xml b/extensions-contrib/druid-deltalake-extensions/pom.xml index fb5ca5912ba..053f6556573 100644 --- a/extensions-contrib/druid-deltalake-extensions/pom.xml +++ b/extensions-contrib/druid-deltalake-extensions/pom.xml @@ -35,7 +35,7 @@ 4.0.0 - 3.1.0 + 3.2.0 diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java index 56ccd2a41ae..7d126caef34 100644 --- a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java +++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java @@ -28,12 +28,12 @@ import io.delta.kernel.Scan; import io.delta.kernel.ScanBuilder; import io.delta.kernel.Snapshot; import io.delta.kernel.Table; -import io.delta.kernel.TableNotFoundException; -import io.delta.kernel.client.TableClient; import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.Row; -import io.delta.kernel.defaults.client.DefaultTableClient; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.TableNotFoundException; import io.delta.kernel.expressions.Predicate; import io.delta.kernel.internal.InternalScanFileUtils; import io.delta.kernel.internal.data.ScanStateRow; @@ -120,7 +120,7 @@ public class DeltaInputSource implements SplittableInputSource /** * Instantiates a {@link DeltaInputSourceReader} to read the Delta table rows. If a {@link DeltaSplit} is supplied, - * the Delta files and schema are obtained from it to instantiate the reader. Otherwise, a Delta table client is + * the Delta files and schema are obtained from it to instantiate the reader. Otherwise, the Delta engine is * instantiated with the supplied configuration to read the table. * * @param inputRowSchema schema for {@link org.apache.druid.data.input.InputRow} @@ -134,40 +134,40 @@ public class DeltaInputSource implements SplittableInputSource File temporaryDirectory ) { - final TableClient tableClient = createTableClient(); + final Engine engine = createDeltaEngine(); try { final List> scanFileDataIters = new ArrayList<>(); if (deltaSplit != null) { - final Row scanState = deserialize(tableClient, deltaSplit.getStateRow()); + final Row scanState = deserialize(engine, deltaSplit.getStateRow()); final StructType physicalReadSchema = - ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState); + ScanStateRow.getPhysicalDataReadSchema(engine, scanState); for (String file : deltaSplit.getFiles()) { - final Row scanFile = deserialize(tableClient, file); + final Row scanFile = deserialize(engine, file); scanFileDataIters.add( - getTransformedDataIterator(tableClient, scanState, scanFile, physicalReadSchema, Optional.empty()) + getTransformedDataIterator(engine, scanState, scanFile, physicalReadSchema, Optional.empty()) ); } } else { - final Table table = Table.forPath(tableClient, tablePath); - final Snapshot latestSnapshot = table.getLatestSnapshot(tableClient); - final StructType fullSnapshotSchema = latestSnapshot.getSchema(tableClient); + final Table table = Table.forPath(engine, tablePath); + final Snapshot latestSnapshot = table.getLatestSnapshot(engine); + final StructType fullSnapshotSchema = latestSnapshot.getSchema(engine); final StructType prunedSchema = pruneSchema( fullSnapshotSchema, inputRowSchema.getColumnsFilter() ); - final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(tableClient); + final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(engine); if (filter != null) { - scanBuilder.withFilter(tableClient, filter.getFilterPredicate(fullSnapshotSchema)); + scanBuilder.withFilter(engine, filter.getFilterPredicate(fullSnapshotSchema)); } - final Scan scan = scanBuilder.withReadSchema(tableClient, prunedSchema).build(); - final CloseableIterator scanFilesIter = scan.getScanFiles(tableClient); - final Row scanState = scan.getScanState(tableClient); + final Scan scan = scanBuilder.withReadSchema(engine, prunedSchema).build(); + final CloseableIterator scanFilesIter = scan.getScanFiles(engine); + final Row scanState = scan.getScanState(engine); final StructType physicalReadSchema = - ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState); + ScanStateRow.getPhysicalDataReadSchema(engine, scanState); while (scanFilesIter.hasNext()) { final FilteredColumnarBatch scanFileBatch = scanFilesIter.next(); @@ -176,7 +176,7 @@ public class DeltaInputSource implements SplittableInputSource while (scanFileRows.hasNext()) { final Row scanFile = scanFileRows.next(); scanFileDataIters.add( - getTransformedDataIterator(tableClient, scanState, scanFile, physicalReadSchema, scan.getRemainingFilter()) + getTransformedDataIterator(engine, scanState, scanFile, physicalReadSchema, scan.getRemainingFilter()) ); } } @@ -203,26 +203,26 @@ public class DeltaInputSource implements SplittableInputSource return Stream.of(new InputSplit<>(deltaSplit)); } - final TableClient tableClient = createTableClient(); + final Engine engine = createDeltaEngine(); final Snapshot latestSnapshot; + final Table table = Table.forPath(engine, tablePath); try { - final Table table = Table.forPath(tableClient, tablePath); - latestSnapshot = table.getLatestSnapshot(tableClient); + latestSnapshot = table.getLatestSnapshot(engine); } catch (TableNotFoundException e) { throw InvalidInput.exception(e, "tablePath[%s] not found.", tablePath); } - final StructType fullSnapshotSchema = latestSnapshot.getSchema(tableClient); + final StructType fullSnapshotSchema = latestSnapshot.getSchema(engine); - final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(tableClient); + final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(engine); if (filter != null) { - scanBuilder.withFilter(tableClient, filter.getFilterPredicate(fullSnapshotSchema)); + scanBuilder.withFilter(engine, filter.getFilterPredicate(fullSnapshotSchema)); } - final Scan scan = scanBuilder.withReadSchema(tableClient, fullSnapshotSchema).build(); + final Scan scan = scanBuilder.withReadSchema(engine, fullSnapshotSchema).build(); // scan files iterator for the current snapshot - final CloseableIterator scanFilesIterator = scan.getScanFiles(tableClient); + final CloseableIterator scanFilesIterator = scan.getScanFiles(engine); - final Row scanState = scan.getScanState(tableClient); + final Row scanState = scan.getScanState(engine); final String scanStateStr = RowSerde.serializeRowToJson(scanState); Iterator deltaSplitIterator = Iterators.transform( @@ -256,9 +256,9 @@ public class DeltaInputSource implements SplittableInputSource ); } - private Row deserialize(TableClient tableClient, String row) + private Row deserialize(Engine engine, String row) { - return RowSerde.deserializeRowFromJson(tableClient, row); + return RowSerde.deserializeRowFromJson(engine, row); } /** @@ -285,17 +285,17 @@ public class DeltaInputSource implements SplittableInputSource } /** - * @return a table client where the client is initialized with {@link Configuration} class that uses the class's + * @return a Delta engine initialized with {@link Configuration} class that uses the class's * class loader instead of the context classloader. The latter by default doesn't know about the extension classes, - * so the table client cannot load runtime classes resulting in {@link ClassNotFoundException}. + * so the Delta engine cannot load runtime classes resulting in {@link ClassNotFoundException}. */ - private TableClient createTableClient() + private Engine createDeltaEngine() { final ClassLoader currCtxClassloader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); final Configuration conf = new Configuration(); - return DefaultTableClient.create(conf); + return DefaultEngine.create(conf); } finally { Thread.currentThread().setContextClassLoader(currCtxClassloader); @@ -308,7 +308,7 @@ public class DeltaInputSource implements SplittableInputSource * SingleThreadedTableReader.java. */ private CloseableIterator getTransformedDataIterator( - final TableClient tableClient, + final Engine engine, final Row scanState, final Row scanFile, final StructType physicalReadSchema, @@ -317,14 +317,14 @@ public class DeltaInputSource implements SplittableInputSource { final FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile); - final CloseableIterator physicalDataIter = tableClient.getParquetHandler().readParquetFiles( + final CloseableIterator physicalDataIter = engine.getParquetHandler().readParquetFiles( Utils.singletonCloseableIterator(fileStatus), physicalReadSchema, optionalPredicate ); return Scan.transformPhysicalData( - tableClient, + engine, scanState, scanFile, physicalDataIter diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java index e37c5d50331..bad6191496d 100644 --- a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java +++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java @@ -23,9 +23,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import io.delta.kernel.client.TableClient; import io.delta.kernel.data.Row; import io.delta.kernel.defaults.internal.data.DefaultJsonRow; +import io.delta.kernel.engine.Engine; import io.delta.kernel.internal.util.VectorUtils; import io.delta.kernel.types.ArrayType; import io.delta.kernel.types.BooleanType; @@ -84,13 +84,12 @@ public class RowSerde /** * Utility method to deserialize a {@link Row} object from the JSON form. */ - public static Row deserializeRowFromJson(TableClient tableClient, String jsonRowWithSchema) + public static Row deserializeRowFromJson(Engine engine, String jsonRowWithSchema) { try { JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonRowWithSchema); JsonNode schemaNode = jsonNode.get("schema"); - StructType schema = - tableClient.getJsonHandler().deserializeStructType(schemaNode.asText()); + StructType schema = engine.getJsonHandler().deserializeStructType(schemaNode.asText()); return parseRowFromJsonWithSchema((ObjectNode) jsonNode.get("row"), schema); } catch (JsonProcessingException e) { diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java index 9e597894d05..4e1c2566f02 100644 --- a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java +++ b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java @@ -20,12 +20,12 @@ package org.apache.druid.delta.input; import io.delta.kernel.Scan; -import io.delta.kernel.TableNotFoundException; -import io.delta.kernel.client.TableClient; import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.Row; -import io.delta.kernel.defaults.client.DefaultTableClient; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.TableNotFoundException; import io.delta.kernel.internal.InternalScanFileUtils; import io.delta.kernel.internal.data.ScanStateRow; import io.delta.kernel.internal.util.Utils; @@ -68,13 +68,13 @@ public class DeltaInputRowTest final List> expectedRows ) throws TableNotFoundException, IOException { - final TableClient tableClient = DefaultTableClient.create(new Configuration()); - final Scan scan = DeltaTestUtils.getScan(tableClient, deltaTablePath); + final Engine engine = DefaultEngine.create(new Configuration()); + final Scan scan = DeltaTestUtils.getScan(engine, deltaTablePath); - final Row scanState = scan.getScanState(tableClient); - final StructType physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState); + final Row scanState = scan.getScanState(engine); + final StructType physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState); - final CloseableIterator scanFileIter = scan.getScanFiles(tableClient); + final CloseableIterator scanFileIter = scan.getScanFiles(engine); int totalRecordCount = 0; while (scanFileIter.hasNext()) { final FilteredColumnarBatch scanFileBatch = scanFileIter.next(); @@ -84,13 +84,13 @@ public class DeltaInputRowTest final Row scanFile = scanFileRows.next(); final FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile); - final CloseableIterator physicalDataIter = tableClient.getParquetHandler().readParquetFiles( + final CloseableIterator physicalDataIter = engine.getParquetHandler().readParquetFiles( Utils.singletonCloseableIterator(fileStatus), physicalReadSchema, Optional.empty() ); final CloseableIterator dataIter = Scan.transformPhysicalData( - tableClient, + engine, scanState, scanFile, physicalDataIter diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTestUtils.java b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTestUtils.java index 96696e7b6a4..6d49428ce02 100644 --- a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTestUtils.java +++ b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTestUtils.java @@ -23,19 +23,19 @@ import io.delta.kernel.Scan; import io.delta.kernel.ScanBuilder; import io.delta.kernel.Snapshot; import io.delta.kernel.Table; -import io.delta.kernel.TableNotFoundException; -import io.delta.kernel.client.TableClient; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.TableNotFoundException; import io.delta.kernel.types.StructType; public class DeltaTestUtils { - public static Scan getScan(final TableClient tableClient, final String deltaTablePath) throws TableNotFoundException + public static Scan getScan(final Engine engine, final String deltaTablePath) throws TableNotFoundException { - final Table table = Table.forPath(tableClient, deltaTablePath); - final Snapshot snapshot = table.getLatestSnapshot(tableClient); - final StructType readSchema = snapshot.getSchema(tableClient); - final ScanBuilder scanBuilder = snapshot.getScanBuilder(tableClient) - .withReadSchema(tableClient, readSchema); + final Table table = Table.forPath(engine, deltaTablePath); + final Snapshot snapshot = table.getLatestSnapshot(engine); + final StructType readSchema = snapshot.getSchema(engine); + final ScanBuilder scanBuilder = snapshot.getScanBuilder(engine) + .withReadSchema(engine, readSchema); return scanBuilder.build(); } } diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/RowSerdeTest.java b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/RowSerdeTest.java index fe2b85d2f3f..7ac3eec09ef 100644 --- a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/RowSerdeTest.java +++ b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/RowSerdeTest.java @@ -20,11 +20,11 @@ package org.apache.druid.delta.input; import io.delta.kernel.Scan; -import io.delta.kernel.TableNotFoundException; import io.delta.kernel.data.Row; -import io.delta.kernel.defaults.client.DefaultTableClient; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.exceptions.TableNotFoundException; import org.apache.hadoop.conf.Configuration; -import org.junit.Assert; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -46,13 +46,13 @@ public class RowSerdeTest @ParameterizedTest(name = "{index}:with context {0}") public void testSerializeDeserializeRoundtrip(final String tablePath) throws TableNotFoundException { - final DefaultTableClient tableClient = DefaultTableClient.create(new Configuration()); - final Scan scan = DeltaTestUtils.getScan(tableClient, tablePath); - final Row scanState = scan.getScanState(tableClient); + final DefaultEngine engine = DefaultEngine.create(new Configuration()); + final Scan scan = DeltaTestUtils.getScan(engine, tablePath); + final Row scanState = scan.getScanState(engine); final String rowJson = RowSerde.serializeRowToJson(scanState); - final Row row = RowSerde.deserializeRowFromJson(tableClient, rowJson); + final Row row = RowSerde.deserializeRowFromJson(engine, rowJson); - Assert.assertEquals(scanState.getSchema(), row.getSchema()); + Assertions.assertEquals(scanState.getSchema(), row.getSchema()); } }