Bump up Delta Lake Kernel to 3.1.0 (#15842)

This patch bumps Delta Lake Kernel dependency from 3.0.0 to 3.1.0, which released last week - please see https://github.com/delta-io/delta/releases/tag/v3.1.0 for release notes.

There were a few "breaking" API changes in 3.1.0, you can find the rationale for some of those changes here.

Next-up in this extension: add and expose filter predicates.
This commit is contained in:
Abhishek Radhakrishnan 2024-02-06 07:55:17 -08:00 committed by GitHub
parent 2dc71c7874
commit 1affa35b29
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 122 additions and 50 deletions

View File

@ -35,7 +35,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<properties> <properties>
<delta-kernel.version>3.0.0</delta-kernel.version> <delta-kernel.version>3.1.0</delta-kernel.version>
</properties> </properties>
<dependencies> <dependencies>

View File

@ -28,13 +28,17 @@ import io.delta.kernel.Snapshot;
import io.delta.kernel.Table; import io.delta.kernel.Table;
import io.delta.kernel.TableNotFoundException; import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.TableClient; import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row; import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.client.DefaultTableClient; import io.delta.kernel.defaults.client.DefaultTableClient;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.data.ScanStateRow;
import io.delta.kernel.internal.util.Utils; import io.delta.kernel.internal.util.Utils;
import io.delta.kernel.types.StructField; import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType; import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputRowSchema;
@ -113,15 +117,19 @@ public class DeltaInputSource implements SplittableInputSource<DeltaSplit>
{ {
final TableClient tableClient = createTableClient(); final TableClient tableClient = createTableClient();
try { try {
final Row scanState; final List<CloseableIterator<FilteredColumnarBatch>> scanFileDataIters = new ArrayList<>();
final List<Row> scanRowList;
if (deltaSplit != null) { if (deltaSplit != null) {
scanState = deserialize(tableClient, deltaSplit.getStateRow()); final Row scanState = deserialize(tableClient, deltaSplit.getStateRow());
scanRowList = deltaSplit.getFiles() final StructType physicalReadSchema =
.stream() ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState);
.map(row -> deserialize(tableClient, row))
.collect(Collectors.toList()); for (String file : deltaSplit.getFiles()) {
final Row scanFile = deserialize(tableClient, file);
scanFileDataIters.add(
getTransformedDataIterator(tableClient, scanState, scanFile, physicalReadSchema)
);
}
} else { } else {
final Table table = Table.forPath(tableClient, tablePath); final Table table = Table.forPath(tableClient, tablePath);
final Snapshot latestSnapshot = table.getLatestSnapshot(tableClient); final Snapshot latestSnapshot = table.getLatestSnapshot(tableClient);
@ -130,24 +138,27 @@ public class DeltaInputSource implements SplittableInputSource<DeltaSplit>
inputRowSchema.getColumnsFilter() inputRowSchema.getColumnsFilter()
); );
final Scan scan = latestSnapshot.getScanBuilder(tableClient).withReadSchema(tableClient, prunedSchema).build(); final Scan scan = latestSnapshot.getScanBuilder(tableClient).withReadSchema(tableClient, prunedSchema).build();
final CloseableIterator<FilteredColumnarBatch> scanFiles = scan.getScanFiles(tableClient); final CloseableIterator<FilteredColumnarBatch> scanFilesIter = scan.getScanFiles(tableClient);
final Row scanState = scan.getScanState(tableClient);
scanState = scan.getScanState(tableClient); final StructType physicalReadSchema =
scanRowList = new ArrayList<>(); ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState);
while (scanFiles.hasNext()) { while (scanFilesIter.hasNext()) {
final FilteredColumnarBatch scanFileBatch = scanFiles.next(); final FilteredColumnarBatch scanFileBatch = scanFilesIter.next();
final CloseableIterator<Row> scanFileRows = scanFileBatch.getRows(); final CloseableIterator<Row> scanFileRows = scanFileBatch.getRows();
scanFileRows.forEachRemaining(scanRowList::add);
while (scanFileRows.hasNext()) {
final Row scanFile = scanFileRows.next();
scanFileDataIters.add(
getTransformedDataIterator(tableClient, scanState, scanFile, physicalReadSchema)
);
} }
} }
}
return new DeltaInputSourceReader( return new DeltaInputSourceReader(
Scan.readData( scanFileDataIters.iterator(),
tableClient,
scanState,
Utils.toCloseableIterator(scanRowList.iterator()),
Optional.empty()
),
inputRowSchema inputRowSchema
); );
} }
@ -258,4 +269,31 @@ public class DeltaInputSource implements SplittableInputSource<DeltaSplit>
Thread.currentThread().setContextClassLoader(currCtxClassloader); Thread.currentThread().setContextClassLoader(currCtxClassloader);
} }
} }
/**
* Utility to get the transformed data iterator from the scan file. Code borrowed and modified from
* <a href="https://github.com/delta-io/delta/blob/master/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/SingleThreadedTableReader.java">
* SingleThreadedTableReader.java</a>.
*/
private CloseableIterator<FilteredColumnarBatch> getTransformedDataIterator(
final TableClient tableClient,
final Row scanState,
final Row scanFile,
final StructType physicalReadSchema
) throws IOException
{
final FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
final CloseableIterator<ColumnarBatch> physicalDataIter = tableClient.getParquetHandler().readParquetFiles(
Utils.singletonCloseableIterator(fileStatus),
physicalReadSchema,
Optional.empty()
);
return Scan.transformPhysicalData(
tableClient,
scanState,
scanFile,
physicalDataIter
);
}
} }

View File

@ -29,6 +29,7 @@ import org.apache.druid.data.input.InputStats;
import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.CloseableIterator;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
/** /**
@ -38,28 +39,29 @@ import java.util.NoSuchElementException;
*/ */
public class DeltaInputSourceReader implements InputSourceReader public class DeltaInputSourceReader implements InputSourceReader
{ {
private final io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch> filteredColumnarBatchCloseableIterator; private final Iterator<io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch>> filteredColumnarBatchIterators;
private final InputRowSchema inputRowSchema; private final InputRowSchema inputRowSchema;
public DeltaInputSourceReader( public DeltaInputSourceReader(
io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch> filteredColumnarBatchCloseableIterator, Iterator<io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch>> filteredColumnarBatchIterators,
InputRowSchema inputRowSchema InputRowSchema inputRowSchema
) )
{ {
this.filteredColumnarBatchCloseableIterator = filteredColumnarBatchCloseableIterator; this.filteredColumnarBatchIterators = filteredColumnarBatchIterators;
this.inputRowSchema = inputRowSchema; this.inputRowSchema = inputRowSchema;
} }
@Override @Override
public CloseableIterator<InputRow> read() public CloseableIterator<InputRow> read()
{ {
return new DeltaInputSourceIterator(filteredColumnarBatchCloseableIterator, inputRowSchema); return new DeltaInputSourceIterator(filteredColumnarBatchIterators, inputRowSchema);
} }
@Override @Override
public CloseableIterator<InputRow> read(InputStats inputStats) public CloseableIterator<InputRow> read(InputStats inputStats)
{ {
return new DeltaInputSourceIterator(filteredColumnarBatchCloseableIterator, inputRowSchema); return new DeltaInputSourceIterator(filteredColumnarBatchIterators, inputRowSchema);
} }
@Override @Override
@ -92,17 +94,17 @@ public class DeltaInputSourceReader implements InputSourceReader
private static class DeltaInputSourceIterator implements CloseableIterator<InputRow> private static class DeltaInputSourceIterator implements CloseableIterator<InputRow>
{ {
private final io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch> filteredColumnarBatchCloseableIterator; private final Iterator<io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch>> filteredColumnarBatchIterators;
private io.delta.kernel.utils.CloseableIterator<Row> currentBatch = null; private io.delta.kernel.utils.CloseableIterator<Row> currentBatch = null;
private final InputRowSchema inputRowSchema; private final InputRowSchema inputRowSchema;
public DeltaInputSourceIterator( public DeltaInputSourceIterator(
io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch> filteredColumnarBatchCloseableIterator, Iterator<io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch>> filteredColumnarBatchCloseableIterator,
InputRowSchema inputRowSchema InputRowSchema inputRowSchema
) )
{ {
this.filteredColumnarBatchCloseableIterator = filteredColumnarBatchCloseableIterator; this.filteredColumnarBatchIterators = filteredColumnarBatchCloseableIterator;
this.inputRowSchema = inputRowSchema; this.inputRowSchema = inputRowSchema;
} }
@ -110,10 +112,19 @@ public class DeltaInputSourceReader implements InputSourceReader
public boolean hasNext() public boolean hasNext()
{ {
while (currentBatch == null || !currentBatch.hasNext()) { while (currentBatch == null || !currentBatch.hasNext()) {
if (!filteredColumnarBatchCloseableIterator.hasNext()) { if (!filteredColumnarBatchIterators.hasNext()) {
return false; // No more batches or records to read! return false; // No more batches or records to read!
} }
currentBatch = filteredColumnarBatchCloseableIterator.next().getRows();
final io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch> filteredBatchIterator =
filteredColumnarBatchIterators.next();
while (filteredBatchIterator.hasNext()) {
currentBatch = filteredBatchIterator.next().getRows();
if (currentBatch.hasNext()) {
return true;
}
}
} }
return true; return true;
} }
@ -132,7 +143,13 @@ public class DeltaInputSourceReader implements InputSourceReader
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
filteredColumnarBatchCloseableIterator.close(); if (currentBatch != null) {
currentBatch.close();
}
if (filteredColumnarBatchIterators.hasNext()) {
filteredColumnarBatchIterators.next().close();
}
} }
} }
} }

View File

@ -26,7 +26,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import io.delta.kernel.client.TableClient; import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.Row; import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.internal.data.DefaultJsonRow; import io.delta.kernel.defaults.internal.data.DefaultJsonRow;
import io.delta.kernel.internal.types.TableSchemaSerDe;
import io.delta.kernel.internal.util.VectorUtils; import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.types.ArrayType; import io.delta.kernel.types.ArrayType;
import io.delta.kernel.types.BooleanType; import io.delta.kernel.types.BooleanType;
@ -73,7 +72,7 @@ public class RowSerde
Map<String, Object> rowObject = convertRowToJsonObject(row); Map<String, Object> rowObject = convertRowToJsonObject(row);
try { try {
Map<String, Object> rowWithSchema = new HashMap<>(); Map<String, Object> rowWithSchema = new HashMap<>();
rowWithSchema.put("schema", TableSchemaSerDe.toJson(row.getSchema())); rowWithSchema.put("schema", row.getSchema().toJson());
rowWithSchema.put("row", rowObject); rowWithSchema.put("row", rowObject);
return OBJECT_MAPPER.writeValueAsString(rowWithSchema); return OBJECT_MAPPER.writeValueAsString(rowWithSchema);
} }
@ -91,7 +90,7 @@ public class RowSerde
JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonRowWithSchema); JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonRowWithSchema);
JsonNode schemaNode = jsonNode.get("schema"); JsonNode schemaNode = jsonNode.get("schema");
StructType schema = StructType schema =
TableSchemaSerDe.fromJson(tableClient.getJsonHandler(), schemaNode.asText()); tableClient.getJsonHandler().deserializeStructType(schemaNode.asText());
return parseRowFromJsonWithSchema((ObjectNode) jsonNode.get("row"), schema); return parseRowFromJsonWithSchema((ObjectNode) jsonNode.get("row"), schema);
} }
catch (JsonProcessingException e) { catch (JsonProcessingException e) {

View File

@ -22,10 +22,16 @@ package org.apache.druid.delta.input;
import io.delta.kernel.Scan; import io.delta.kernel.Scan;
import io.delta.kernel.TableNotFoundException; import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.TableClient; import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row; import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.client.DefaultTableClient; import io.delta.kernel.defaults.client.DefaultTableClient;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.data.ScanStateRow;
import io.delta.kernel.internal.util.Utils;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -42,23 +48,35 @@ public class DeltaInputRowTest
final TableClient tableClient = DefaultTableClient.create(new Configuration()); final TableClient tableClient = DefaultTableClient.create(new Configuration());
final Scan scan = DeltaTestUtils.getScan(tableClient); final Scan scan = DeltaTestUtils.getScan(tableClient);
CloseableIterator<FilteredColumnarBatch> scanFileIter = scan.getScanFiles(tableClient); final Row scanState = scan.getScanState(tableClient);
final StructType physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState);
final CloseableIterator<FilteredColumnarBatch> scanFileIter = scan.getScanFiles(tableClient);
int totalRecordCount = 0; int totalRecordCount = 0;
while (scanFileIter.hasNext()) { while (scanFileIter.hasNext()) {
try (CloseableIterator<FilteredColumnarBatch> data = final FilteredColumnarBatch scanFileBatch = scanFileIter.next();
Scan.readData( final CloseableIterator<Row> scanFileRows = scanFileBatch.getRows();
tableClient,
scan.getScanState(tableClient), while (scanFileRows.hasNext()) {
scanFileIter.next().getRows(), final Row scanFile = scanFileRows.next();
final FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
final CloseableIterator<ColumnarBatch> physicalDataIter = tableClient.getParquetHandler().readParquetFiles(
Utils.singletonCloseableIterator(fileStatus),
physicalReadSchema,
Optional.empty() Optional.empty()
)) {
while (data.hasNext()) {
FilteredColumnarBatch dataReadResult = data.next();
Row next = dataReadResult.getRows().next();
DeltaInputRow deltaInputRow = new DeltaInputRow(
next,
DeltaTestUtils.FULL_SCHEMA
); );
final CloseableIterator<FilteredColumnarBatch> dataIter = Scan.transformPhysicalData(
tableClient,
scanState,
scanFile,
physicalDataIter
);
while (dataIter.hasNext()) {
FilteredColumnarBatch dataReadResult = dataIter.next();
Row next = dataReadResult.getRows().next();
DeltaInputRow deltaInputRow = new DeltaInputRow(next, DeltaTestUtils.FULL_SCHEMA);
Assert.assertNotNull(deltaInputRow); Assert.assertNotNull(deltaInputRow);
Assert.assertEquals(DeltaTestUtils.DIMENSIONS, deltaInputRow.getDimensions()); Assert.assertEquals(DeltaTestUtils.DIMENSIONS, deltaInputRow.getDimensions());

View File

@ -1,2 +1,2 @@
delta-spark==3.0.0 delta-spark==3.1.0
pyspark==3.5.0 pyspark==3.5.0