Support for filters in the Druid Delta Lake connector (#16288)

* Delta Lake support for filters.

* Updates

* cleanup comments

* Docs

* Remmove Enclosed runner

* Rename

* Cleanup test

* Serde test for the Delta input source and fix jackson annotation.

* Updates and docs.

* Update error messages to be clearer

* Fixes

* Handle NumberFormatException to provide a nicer error message.

* Apply suggestions from code review

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Doc fixes based on feedback

* Yes -> yes in docs; reword slightly.

* Update docs/ingestion/input-sources.md

Co-authored-by: Laksh Singla <lakshsingla@gmail.com>

* Update docs/ingestion/input-sources.md

Co-authored-by: Laksh Singla <lakshsingla@gmail.com>

* Documentation, javadoc and more updates.

* Not with an or expression end-to-end test.

* Break up =, >, >=, <, <= into its own types instead of sub-classing.

---------

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>
Co-authored-by: Laksh Singla <lakshsingla@gmail.com>
This commit is contained in:
Abhishek Radhakrishnan 2024-04-29 11:31:36 -07:00 committed by GitHub
parent f8015eb02a
commit 1d7595f3f7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
69 changed files with 2678 additions and 490 deletions

View File

@ -22,16 +22,14 @@ title: "Delta Lake extension"
~ under the License.
-->
## Delta Lake extension
Delta Lake is an open source storage framework that enables building a
Lakehouse architecture with various compute engines. [DeltaLakeInputSource](../../ingestion/input-sources.md#delta-lake-input-source) lets
you ingest data stored in a Delta Lake table into Apache Druid. To use the Delta Lake extension, add the `druid-deltalake-extensions` to the list of loaded extensions.
See [Loading extensions](../../configuration/extensions.md#loading-extensions) for more information.
The Delta input source reads the configured Delta Lake table and extracts all the underlying delta files in the table's latest snapshot.
These Delta Lake files are versioned Parquet files.
The Delta input source reads the configured Delta Lake table and extracts the underlying Delta files in the table's latest snapshot
based on an optional Delta filter. These Delta Lake files are versioned Parquet files.
## Version support
@ -57,5 +55,5 @@ See [Loading community extensions](../../configuration/extensions.md#loading-com
## Known limitations
- This extension relies on the Delta Kernel API and can only read from the latest Delta table snapshot.
- Column filtering isn't supported. The extension reads all columns in the configured table.
This extension relies on the Delta Kernel API and can only read from the latest Delta table snapshot. Ability to read from
arbitrary snapshots is tracked [here](https://github.com/delta-io/delta/issues/2581).

View File

@ -1141,7 +1141,86 @@ To use the Delta Lake input source, load the extension [`druid-deltalake-extensi
You can use the Delta input source to read data stored in a Delta Lake table. For a given table, the input source scans
the latest snapshot from the configured table. Druid ingests the underlying delta files from the table.
The following is a sample spec:
| Property|Description|Required|
|---------|-----------|--------|
| type|Set this value to `delta`.|yes|
| tablePath|The location of the Delta table.|yes|
| filter|The JSON Object that filters data files within a snapshot.|no|
### Delta filter object
You can use these filters to filter out data files from a snapshot, reducing the number of files Druid has to ingest from
a Delta table. This input source provides the following filters: `and`, `or`, `not`, `=`, `>`, `>=`, `<`, `<=`.
When a filter is applied on non-partitioned columns, the filtering is best-effort as the Delta Kernel solely relies
on statistics collected when the non-partitioned table is created. In this scenario, this Druid connector may ingest
data that doesn't match the filter. To guarantee that the Delta Kernel prunes out unnecessary column values, only use
filters on partitioned columns.
`and` filter:
| Property | Description | Required |
|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|
| type | Set this value to `and`. | yes |
| filters | List of Delta filter predicates that get evaluated using logical AND where both conditions need to be true. `and` filter requires two filter predicates. | yes |
`or` filter:
| Property | Description | Required |
|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|
| type | Set this value to `or`. | yes |
| filters | List of Delta filter predicates that get evaluated using logical OR where only one condition needs to be true. `or` filter requires two filter predicates. | yes |
`not` filter:
| Property | Description | Required |
|----------|---------------------------------------------------------------------------------------------------------------|----------|
| type | Set this value to `not`. | yes |
| filter | The Delta filter predicate that gets evaluated using logical NOT. `not` filter requires one filter predicate. | yes |
`=` filter:
| Property | Description | Required |
|----------|------------------------------------------|----------|
| type | Set this value to `=`. | yes |
| column | The table column to apply the filter on. | yes |
| value | The value to use in the filter. | yes |
`>` filter:
| Property | Description | Required |
|----------|------------------------------------------|----------|
| type | Set this value to `>`. | yes |
| column | The table column to apply the filter on. | yes |
| value | The value to use in the filter. | yes |
`>=` filter:
| Property | Description | Required |
|----------|------------------------------------------|----------|
| type | Set this value to `>=`. | yes |
| column | The table column to apply the filter on. | yes |
| value | The value to use in the filter. | yes |
`<` filter:
| Property | Description | Required |
|----------|------------------------------------------|----------|
| type | Set this value to `<`. | Yes |
| column | The table column to apply the filter on. | Yes |
| value | The value to use in the filter. | Yes |
`<=` filter:
| Property | Description | Required |
|----------|------------------------------------------|----------|
| type | Set this value to `<=`. | yes |
| column | The table column to apply the filter on. | yes |
| value | The value to use in the filter. | yes |
The following is a sample spec to read all records from the Delta table `/delta-table/foo`:
```json
...
@ -1149,14 +1228,35 @@ The following is a sample spec:
"type": "index_parallel",
"inputSource": {
"type": "delta",
"tablePath": "/delta-table/directory"
"tablePath": "/delta-table/foo"
},
}
}
```
| Property|Description|Required|
|---------|-----------|--------|
| type|Set this value to `delta`.|yes|
| tablePath|The location of the Delta table.|yes|
The following is a sample spec to read records from the Delta table `/delta-table/foo` to select records where `name = 'Employee4' and age >= 30`:
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "delta",
"tablePath": "/delta-table/foo",
"filter": {
"type": "and",
"filters": [
{
"type": "=",
"column": "name",
"value": "Employee4"
},
{
"type": ">=",
"column": "age",
"value": "30"
}
]
}
},
}
```

View File

@ -118,11 +118,37 @@
</exclusions>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-migrationsupport</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.delta.kernel.expressions.And;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructType;
import org.apache.druid.error.InvalidInput;
import java.util.List;
/**
* Druid {@link DeltaFilter} that maps to a canonical {@link And} predicate.
* @implNote currently this filter only allows 2 filter predicates. However, this can be relaxed by recursively
* flattening the filters to allow complex expressions.
*/
public class DeltaAndFilter implements DeltaFilter
{
@JsonProperty
private final List<DeltaFilter> filters;
@JsonCreator
public DeltaAndFilter(@JsonProperty("filters") final List<DeltaFilter> filters)
{
if (filters == null) {
throw InvalidInput.exception(
"Delta and filter requires 2 filter predicates and must be non-empty. None provided."
);
}
if (filters.size() != 2) {
throw InvalidInput.exception(
"Delta and filter requires 2 filter predicates, but provided [%d].",
filters.size()
);
}
this.filters = filters;
}
@Override
public Predicate getFilterPredicate(StructType snapshotSchema)
{
// This is simple for now. We can do a recursive flatten.
return new And(
filters.get(0).getFilterPredicate(snapshotSchema),
filters.get(1).getFilterPredicate(snapshotSchema)
);
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructType;
import org.apache.druid.error.InvalidInput;
/**
* Druid {@link DeltaFilter} that maps to a Delta predicate of type = for the supplied column and value.
*/
public class DeltaEqualsFilter implements DeltaFilter
{
@JsonProperty
private final String column;
@JsonProperty
private final String value;
@JsonCreator
public DeltaEqualsFilter(@JsonProperty("column") final String column, @JsonProperty("value") final String value)
{
if (column == null) {
throw InvalidInput.exception("column is a required field for = filter.");
}
if (value == null) {
throw InvalidInput.exception(
"value is a required field for = filter. None provided for column[%s].", column
);
}
this.column = column;
this.value = value;
}
@Override
public Predicate getFilterPredicate(StructType snapshotSchema)
{
return new Predicate(
"=",
ImmutableList.of(
new Column(column),
DeltaFilterUtils.dataTypeToLiteral(snapshotSchema, column, value)
)
);
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.filter;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructType;
/**
* Druid filters that translate to the underlying Delta Kernel {@link Predicate}s. Implementations should
* provide an expression tree syntax to provide more flexibility to users.
*
* <p>
* A user-facing Druid {@link DeltaFilter} should be translated to a canonical Delta Kernel {@link Predicate}.
* Implementations should provide this one-to-one translation.
* </p>
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "=", value = DeltaEqualsFilter.class),
@JsonSubTypes.Type(name = ">", value = DeltaGreaterThanFilter.class),
@JsonSubTypes.Type(name = ">=", value = DeltaGreaterThanOrEqualsFilter.class),
@JsonSubTypes.Type(name = "<", value = DeltaLessThanFilter.class),
@JsonSubTypes.Type(name = "<=", value = DeltaLessThanOrEqualsFilter.class),
@JsonSubTypes.Type(name = "and", value = DeltaAndFilter.class),
@JsonSubTypes.Type(name = "or", value = DeltaOrFilter.class),
@JsonSubTypes.Type(name = "not", value = DeltaNotFilter.class),
})
public interface DeltaFilter
{
/**
* Return a Delta predicate expression. The {@code snapshotSchema} should be used to perform any validations
* and derive sub-expressions to be used in the resulting {@link Predicate}.
*/
Predicate getFilterPredicate(StructType snapshotSchema);
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.filter;
import io.delta.kernel.expressions.Literal;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.DateType;
import io.delta.kernel.types.DoubleType;
import io.delta.kernel.types.FloatType;
import io.delta.kernel.types.IntegerType;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.ShortType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import org.apache.druid.error.InvalidInput;
import java.sql.Date;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
public class DeltaFilterUtils
{
/**
* @return a Delta typed literal with the type of value inferred from the snapshot schema. The column must
* be present in the supplied snapshot schema.
*/
static Literal dataTypeToLiteral(
final StructType snapshotSchema,
final String column,
final String value
)
{
if (!snapshotSchema.fieldNames().contains(column)) {
throw InvalidInput.exception(
"column[%s] doesn't exist in schema[%s]", column, snapshotSchema
);
}
final StructField structField = snapshotSchema.get(column);
final DataType dataType = structField.getDataType();
try {
if (dataType instanceof StringType) {
return Literal.ofString(value);
} else if (dataType instanceof IntegerType) {
return Literal.ofInt(Integer.parseInt(value));
} else if (dataType instanceof ShortType) {
return Literal.ofShort(Short.parseShort(value));
} else if (dataType instanceof LongType) {
return Literal.ofLong(Long.parseLong(value));
} else if (dataType instanceof FloatType) {
return Literal.ofFloat(Float.parseFloat(value));
} else if (dataType instanceof DoubleType) {
return Literal.ofDouble(Double.parseDouble(value));
} else if (dataType instanceof DateType) {
final Date dataVal = Date.valueOf(value);
final int daysSinceEpoch = (int) ChronoUnit.DAYS.between(
LocalDate.ofEpochDay(0), dataVal.toLocalDate()
);
return Literal.ofDate(daysSinceEpoch);
} else {
throw InvalidInput.exception(
"Unsupported data type[%s] for column[%s] with value[%s].",
dataType, column, value
);
}
}
catch (NumberFormatException e) {
throw InvalidInput.exception(
"column[%s] has an invalid value[%s]. The value must be a number, as the column's data type is [%s].",
column, value, dataType
);
}
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructType;
import org.apache.druid.error.InvalidInput;
/**
* Druid {@link DeltaFilter} that maps to a Delta predicate of type > for the supplied column and value.
*/
public class DeltaGreaterThanFilter implements DeltaFilter
{
@JsonProperty
private final String column;
@JsonProperty
private final String value;
@JsonCreator
public DeltaGreaterThanFilter(@JsonProperty("column") final String column, @JsonProperty("value") final String value)
{
if (column == null) {
throw InvalidInput.exception("column is a required field for > filter.");
}
if (value == null) {
throw InvalidInput.exception(
"value is a required field for > filter. None provided for column[%s].", column
);
}
this.column = column;
this.value = value;
}
@Override
public Predicate getFilterPredicate(StructType snapshotSchema)
{
return new Predicate(
">",
ImmutableList.of(
new Column(column),
DeltaFilterUtils.dataTypeToLiteral(snapshotSchema, column, value)
)
);
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructType;
import org.apache.druid.error.InvalidInput;
/**
* Druid {@link DeltaFilter} that maps to a Delta predicate of type >= for the supplied column and value.
*/
public class DeltaGreaterThanOrEqualsFilter implements DeltaFilter
{
@JsonProperty
private final String column;
@JsonProperty
private final String value;
@JsonCreator
public DeltaGreaterThanOrEqualsFilter(@JsonProperty("column") final String column, @JsonProperty("value") final String value)
{
if (column == null) {
throw InvalidInput.exception("column is a required field for >= filter.");
}
if (value == null) {
throw InvalidInput.exception(
"value is a required field for >= filter. None provided for column[%s].", column
);
}
this.column = column;
this.value = value;
}
@Override
public Predicate getFilterPredicate(StructType snapshotSchema)
{
return new Predicate(
">=",
ImmutableList.of(
new Column(column),
DeltaFilterUtils.dataTypeToLiteral(snapshotSchema, column, value)
)
);
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructType;
import org.apache.druid.error.InvalidInput;
/**
* Druid {@link DeltaFilter} that maps to a Delta predicate of type < for the supplied column and value.
*/
public class DeltaLessThanFilter implements DeltaFilter
{
@JsonProperty
private final String column;
@JsonProperty
private final String value;
@JsonCreator
public DeltaLessThanFilter(@JsonProperty("column") final String column, @JsonProperty("value") final String value)
{
if (column == null) {
throw InvalidInput.exception("column is a required field for < filter.");
}
if (value == null) {
throw InvalidInput.exception(
"value is a required field for < filter. None provided for column[%s].", column
);
}
this.column = column;
this.value = value;
}
@Override
public Predicate getFilterPredicate(StructType snapshotSchema)
{
return new Predicate(
"<",
ImmutableList.of(
new Column(column),
DeltaFilterUtils.dataTypeToLiteral(snapshotSchema, column, value)
)
);
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructType;
import org.apache.druid.error.InvalidInput;
/**
* Druid {@link DeltaFilter} that maps to a Delta predicate of type <= for the supplied column and value.
*/
public class DeltaLessThanOrEqualsFilter implements DeltaFilter
{
@JsonProperty
private final String column;
@JsonProperty
private final String value;
@JsonCreator
public DeltaLessThanOrEqualsFilter(@JsonProperty("column") final String column, @JsonProperty("value") final String value)
{
if (column == null) {
throw InvalidInput.exception("column is a required field for <= filter.");
}
if (value == null) {
throw InvalidInput.exception(
"value is a required field for <= filter. None provided for column[%s].", column
);
}
this.column = column;
this.value = value;
}
@Override
public Predicate getFilterPredicate(StructType snapshotSchema)
{
return new Predicate(
"<=",
ImmutableList.of(
new Column(column),
DeltaFilterUtils.dataTypeToLiteral(snapshotSchema, column, value)
)
);
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructType;
import org.apache.druid.error.InvalidInput;
/**
* Druid {@link DeltaFilter} that maps to a canonical NOT {@link Predicate}.
*/
public class DeltaNotFilter implements DeltaFilter
{
@JsonProperty
private final DeltaFilter filter;
@JsonCreator
public DeltaNotFilter(@JsonProperty("filter") final DeltaFilter filter)
{
if (filter == null) {
throw InvalidInput.exception(
"Delta not filter requiers 1 filter predicate and must be non-empty. None provided."
);
}
this.filter = filter;
}
@Override
public Predicate getFilterPredicate(StructType snapshotSchema)
{
return new Predicate(
"NOT",
filter.getFilterPredicate(snapshotSchema)
);
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.delta.kernel.expressions.Or;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructType;
import org.apache.druid.error.InvalidInput;
import java.util.List;
/**
* Druid {@link DeltaFilter} that maps to a canonical {@link Or} predicate.
* @implNote currently this filter only allows 2 filter predicates. However, this can be relaxed by recursively
* flattening the filters to allow complex expressions.
*/
public class DeltaOrFilter implements DeltaFilter
{
@JsonProperty
private final List<DeltaFilter> filters;
@JsonCreator
public DeltaOrFilter(@JsonProperty("filters") final List<DeltaFilter> filters)
{
if (filters == null) {
throw InvalidInput.exception(
"Delta or filter requires 2 filter predicates and must be non-empty. None provided."
);
}
if (filters.size() != 2) {
throw InvalidInput.exception(
"Delta or filter requires 2 filter predicates, but provided [%d].",
filters.size()
);
}
this.filters = filters;
}
@Override
public Predicate getFilterPredicate(StructType snapshotSchema)
{
// This is simple for now. We can do a recursive flatten.
return new Or(
filters.get(0).getFilterPredicate(snapshotSchema),
filters.get(1).getFilterPredicate(snapshotSchema)
);
}
}

View File

@ -21,9 +21,11 @@ package org.apache.druid.delta.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.primitives.Ints;
import io.delta.kernel.Scan;
import io.delta.kernel.ScanBuilder;
import io.delta.kernel.Snapshot;
import io.delta.kernel.Table;
import io.delta.kernel.TableNotFoundException;
@ -32,6 +34,7 @@ import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.client.DefaultTableClient;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.data.ScanStateRow;
import io.delta.kernel.internal.util.Utils;
@ -47,6 +50,7 @@ import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.delta.filter.DeltaFilter;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.utils.Streams;
import org.apache.hadoop.conf.Configuration;
@ -62,11 +66,20 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Input source to ingest data from a Delta Lake.
* This input source reads the latest snapshot from a Delta table specified by {@code tablePath} parameter.
* Input source to ingest data from a Delta Lake. This input source reads the latest snapshot from a Delta table
* specified by {@code tablePath} parameter. If {@code filter} is specified, it's used at the Kernel level
* for data pruning. The filtering behavior is as follows:
* <ul>
* <li> When a filter is applied on a partitioned table using the partitioning columns, the filtering is guaranteed. </li>
* <li> When a filter is applied on non-partitioned columns, the filtering is best-effort as the Delta
* Kernel solely relies on statistics collected when the non-partitioned table is created. In this scenario, this input
* source connector may ingest data that doesn't match the filter. </li>
* </ul>
* <p>
* We leverage the Delta Kernel APIs to interact with a Delta table. The Kernel API abstracts away the
* complexities of the Delta protocol itself.
* Note: currently, the Kernel table API only supports reading from the latest snapshot.
* </p>
*/
public class DeltaInputSource implements SplittableInputSource<DeltaSplit>
{
@ -79,10 +92,15 @@ public class DeltaInputSource implements SplittableInputSource<DeltaSplit>
@Nullable
private final DeltaSplit deltaSplit;
@JsonProperty
@Nullable
private final DeltaFilter filter;
@JsonCreator
public DeltaInputSource(
@JsonProperty("tablePath") String tablePath,
@JsonProperty("deltaSplit") @Nullable DeltaSplit deltaSplit
@JsonProperty("tablePath") final String tablePath,
@JsonProperty("deltaSplit") @Nullable final DeltaSplit deltaSplit,
@JsonProperty("filter") @Nullable final DeltaFilter filter
)
{
if (tablePath == null) {
@ -90,6 +108,7 @@ public class DeltaInputSource implements SplittableInputSource<DeltaSplit>
}
this.tablePath = tablePath;
this.deltaSplit = deltaSplit;
this.filter = filter;
}
@Override
@ -127,17 +146,23 @@ public class DeltaInputSource implements SplittableInputSource<DeltaSplit>
for (String file : deltaSplit.getFiles()) {
final Row scanFile = deserialize(tableClient, file);
scanFileDataIters.add(
getTransformedDataIterator(tableClient, scanState, scanFile, physicalReadSchema)
getTransformedDataIterator(tableClient, scanState, scanFile, physicalReadSchema, Optional.empty())
);
}
} else {
final Table table = Table.forPath(tableClient, tablePath);
final Snapshot latestSnapshot = table.getLatestSnapshot(tableClient);
final StructType fullSnapshotSchema = latestSnapshot.getSchema(tableClient);
final StructType prunedSchema = pruneSchema(
latestSnapshot.getSchema(tableClient),
fullSnapshotSchema,
inputRowSchema.getColumnsFilter()
);
final Scan scan = latestSnapshot.getScanBuilder(tableClient).withReadSchema(tableClient, prunedSchema).build();
final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(tableClient);
if (filter != null) {
scanBuilder.withFilter(tableClient, filter.getFilterPredicate(fullSnapshotSchema));
}
final Scan scan = scanBuilder.withReadSchema(tableClient, prunedSchema).build();
final CloseableIterator<FilteredColumnarBatch> scanFilesIter = scan.getScanFiles(tableClient);
final Row scanState = scan.getScanState(tableClient);
@ -151,7 +176,7 @@ public class DeltaInputSource implements SplittableInputSource<DeltaSplit>
while (scanFileRows.hasNext()) {
final Row scanFile = scanFileRows.next();
scanFileDataIters.add(
getTransformedDataIterator(tableClient, scanState, scanFile, physicalReadSchema)
getTransformedDataIterator(tableClient, scanState, scanFile, physicalReadSchema, scan.getRemainingFilter())
);
}
}
@ -187,7 +212,13 @@ public class DeltaInputSource implements SplittableInputSource<DeltaSplit>
catch (TableNotFoundException e) {
throw InvalidInput.exception(e, "tablePath[%s] not found.", tablePath);
}
final Scan scan = latestSnapshot.getScanBuilder(tableClient).build();
final StructType fullSnapshotSchema = latestSnapshot.getSchema(tableClient);
final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(tableClient);
if (filter != null) {
scanBuilder.withFilter(tableClient, filter.getFilterPredicate(fullSnapshotSchema));
}
final Scan scan = scanBuilder.withReadSchema(tableClient, fullSnapshotSchema).build();
// scan files iterator for the current snapshot
final CloseableIterator<FilteredColumnarBatch> scanFilesIterator = scan.getScanFiles(tableClient);
@ -220,7 +251,8 @@ public class DeltaInputSource implements SplittableInputSource<DeltaSplit>
{
return new DeltaInputSource(
tablePath,
split.get()
split.get(),
filter
);
}
@ -279,7 +311,8 @@ public class DeltaInputSource implements SplittableInputSource<DeltaSplit>
final TableClient tableClient,
final Row scanState,
final Row scanFile,
final StructType physicalReadSchema
final StructType physicalReadSchema,
final Optional<Predicate> optionalPredicate
) throws IOException
{
final FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
@ -287,8 +320,9 @@ public class DeltaInputSource implements SplittableInputSource<DeltaSplit>
final CloseableIterator<ColumnarBatch> physicalDataIter = tableClient.getParquetHandler().readParquetFiles(
Utils.singletonCloseableIterator(fileStatus),
physicalReadSchema,
Optional.empty()
optionalPredicate
);
return Scan.transformPhysicalData(
tableClient,
scanState,
@ -296,4 +330,16 @@ public class DeltaInputSource implements SplittableInputSource<DeltaSplit>
physicalDataIter
);
}
@VisibleForTesting
String getTablePath()
{
return tablePath;
}
@VisibleForTesting
DeltaFilter getFilter()
{
return filter;
}
}

View File

@ -45,7 +45,6 @@ public class DeltaInputSourceReader implements InputSourceReader
public DeltaInputSourceReader(
Iterator<io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch>> filteredColumnarBatchIterators,
InputRowSchema inputRowSchema
)
{
this.filteredColumnarBatchIterators = filteredColumnarBatchIterators;
@ -120,7 +119,8 @@ public class DeltaInputSourceReader implements InputSourceReader
filteredColumnarBatchIterators.next();
while (filteredBatchIterator.hasNext()) {
currentBatch = filteredBatchIterator.next().getRows();
final FilteredColumnarBatch nextBatch = filteredBatchIterator.next();
currentBatch = nextBatch.getRows();
if (currentBatch.hasNext()) {
return true;
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.filter;
import io.delta.kernel.expressions.And;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.java.util.common.StringUtils;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
public class DeltaAndFilterTest
{
private static final StructType SCHEMA = new StructType()
.add(new StructField("name", StringType.STRING, true))
.add(new StructField("age", LongType.LONG, false))
.add(new StructField("bar", StringType.STRING, true));
@Test
public void testAndFilter()
{
DeltaAndFilter andFilter = new DeltaAndFilter(
Arrays.asList(
new DeltaEqualsFilter("name", "Employee1"),
new DeltaGreaterThanOrEqualsFilter("age", "8")
)
);
Predicate predicate = andFilter.getFilterPredicate(SCHEMA);
Assert.assertTrue(predicate instanceof And);
Assert.assertEquals(2, predicate.getChildren().size());
}
@Test
public void testAndFilterWithInvalidColumn()
{
DeltaAndFilter andFilter = new DeltaAndFilter(
Arrays.asList(
new DeltaEqualsFilter("name2", "Employee1"),
new DeltaGreaterThanOrEqualsFilter("age", "8")
)
);
MatcherAssert.assertThat(
Assert.assertThrows(DruidException.class, () -> andFilter.getFilterPredicate(SCHEMA)),
DruidExceptionMatcher.invalidInput().expectMessageIs(
StringUtils.format("column[name2] doesn't exist in schema[%s]", SCHEMA)
)
);
}
@Test
public void testAndFilterWithNoFilterPredicates()
{
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new DeltaAndFilter(null)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"Delta and filter requires 2 filter predicates and must be non-empty. None provided."
)
);
}
@Test
public void testAndFilterWithOneFilterPredicate()
{
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new DeltaAndFilter(
Collections.singletonList(
new DeltaEqualsFilter("name", "Employee1")
)
)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"Delta and filter requires 2 filter predicates, but provided [1]."
)
);
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.filter;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.DateType;
import io.delta.kernel.types.DoubleType;
import io.delta.kernel.types.FloatType;
import io.delta.kernel.types.IntegerType;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.ShortType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
public class DeltaEqualsFilterTest
{
private static final StructType SCHEMA = new StructType()
.add(new StructField("str_col", StringType.STRING, true))
.add(new StructField("int_col", IntegerType.INTEGER, true))
.add(new StructField("short_col", ShortType.SHORT, true))
.add(new StructField("long_col", LongType.LONG, false))
.add(new StructField("float_col", FloatType.FLOAT, true))
.add(new StructField("double_col", DoubleType.DOUBLE, true))
.add(new StructField("date_col", DateType.DATE, true));
@Test
public void testEqualsFilter()
{
DeltaEqualsFilter eqFilter = new DeltaEqualsFilter("str_col", "Employee1");
Predicate predicate = eqFilter.getFilterPredicate(SCHEMA);
Assert.assertEquals("=", predicate.getName());
Assert.assertEquals(2, predicate.getChildren().size());
}
@Test
public void testFilterWithNullColumn()
{
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new DeltaEqualsFilter(null, "Employee1")
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"column is a required field for = filter."
)
);
}
@Test
public void testFilterWithNullValue()
{
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new DeltaEqualsFilter("str_col", null)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"value is a required field for = filter. None provided for column[str_col]."
)
);
}
@Test
public void testFilterWithInvalidNumericValue()
{
DeltaEqualsFilter eqFilter = new DeltaEqualsFilter("long_col", "twentyOne");
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> eqFilter.getFilterPredicate(SCHEMA)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"column[long_col] has an invalid value[twentyOne]. The value must be a number, as the column's data type is [long]."
)
);
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.filter;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.DateType;
import io.delta.kernel.types.DoubleType;
import io.delta.kernel.types.FloatType;
import io.delta.kernel.types.IntegerType;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.ShortType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import org.junit.Assert;
import org.junit.Test;
public class DeltaGreaterThanFilterTest
{
private static final StructType SCHEMA = new StructType()
.add(new StructField("str_col", StringType.STRING, true))
.add(new StructField("int_col", IntegerType.INTEGER, true))
.add(new StructField("short_col", ShortType.SHORT, true))
.add(new StructField("long_col", LongType.LONG, false))
.add(new StructField("float_col", FloatType.FLOAT, true))
.add(new StructField("double_col", DoubleType.DOUBLE, true))
.add(new StructField("date_col", DateType.DATE, true));
@Test
public void testGreaterThanFilter()
{
DeltaGreaterThanFilter gtFilter = new DeltaGreaterThanFilter("int_col", "123");
Predicate predicate = gtFilter.getFilterPredicate(SCHEMA);
Assert.assertEquals(">", predicate.getName());
Assert.assertEquals(2, predicate.getChildren().size());
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.filter;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.DateType;
import io.delta.kernel.types.DoubleType;
import io.delta.kernel.types.FloatType;
import io.delta.kernel.types.IntegerType;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.ShortType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import org.junit.Assert;
import org.junit.Test;
public class DeltaGreaterThanOrEqualsFilterTest
{
private static final StructType SCHEMA = new StructType()
.add(new StructField("str_col", StringType.STRING, true))
.add(new StructField("int_col", IntegerType.INTEGER, true))
.add(new StructField("short_col", ShortType.SHORT, true))
.add(new StructField("long_col", LongType.LONG, false))
.add(new StructField("float_col", FloatType.FLOAT, true))
.add(new StructField("double_col", DoubleType.DOUBLE, true))
.add(new StructField("date_col", DateType.DATE, true));
@Test
public void testGreaterThanOrEqualsFilter()
{
DeltaGreaterThanOrEqualsFilter gteFilter = new DeltaGreaterThanOrEqualsFilter("long_col", "1234343232323");
Predicate predicate = gteFilter.getFilterPredicate(SCHEMA);
Assert.assertEquals(">=", predicate.getName());
Assert.assertEquals(2, predicate.getChildren().size());
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.filter;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.DateType;
import io.delta.kernel.types.DoubleType;
import io.delta.kernel.types.FloatType;
import io.delta.kernel.types.IntegerType;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.ShortType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import org.junit.Assert;
import org.junit.Test;
public class DeltaLessThanFilterTest
{
private static final StructType SCHEMA = new StructType()
.add(new StructField("str_col", StringType.STRING, true))
.add(new StructField("int_col", IntegerType.INTEGER, true))
.add(new StructField("short_col", ShortType.SHORT, true))
.add(new StructField("long_col", LongType.LONG, false))
.add(new StructField("float_col", FloatType.FLOAT, true))
.add(new StructField("double_col", DoubleType.DOUBLE, true))
.add(new StructField("date_col", DateType.DATE, true));
@Test
public void testLessThanFilter()
{
DeltaLessThanFilter ltFilter = new DeltaLessThanFilter("double_col", "123.2323");
Predicate predicate = ltFilter.getFilterPredicate(SCHEMA);
Assert.assertEquals("<", predicate.getName());
Assert.assertEquals(2, predicate.getChildren().size());
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.filter;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.DateType;
import io.delta.kernel.types.DoubleType;
import io.delta.kernel.types.FloatType;
import io.delta.kernel.types.IntegerType;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.ShortType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
public class DeltaLessThanOrEqualsFilterTest
{
private static final StructType SCHEMA = new StructType()
.add(new StructField("str_col", StringType.STRING, true))
.add(new StructField("int_col", IntegerType.INTEGER, true))
.add(new StructField("short_col", ShortType.SHORT, true))
.add(new StructField("long_col", LongType.LONG, false))
.add(new StructField("float_col", FloatType.FLOAT, true))
.add(new StructField("double_col", DoubleType.DOUBLE, true))
.add(new StructField("date_col", DateType.DATE, true));
@Test
public void testLessThanOrEqualsFilter()
{
DeltaLessThanOrEqualsFilter lteFilter = new DeltaLessThanOrEqualsFilter("date_col", "2024-01-01");
Predicate predicate = lteFilter.getFilterPredicate(SCHEMA);
Assert.assertEquals("<=", predicate.getName());
Assert.assertEquals(2, predicate.getChildren().size());
}
@Test
public void testFilterWithInvalidNumericValue()
{
DeltaLessThanOrEqualsFilter lteFilter = new DeltaLessThanOrEqualsFilter("long_col", "twentyOne");
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> lteFilter.getFilterPredicate(SCHEMA)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"column[long_col] has an invalid value[twentyOne]. The value must be a number, as the column's data type is [long]."
)
);
}
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.filter;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.java.util.common.StringUtils;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
public class DeltaNotFilterTest
{
private static final StructType SCHEMA = new StructType()
.add(new StructField("name", StringType.STRING, true))
.add(new StructField("age", LongType.LONG, false))
.add(new StructField("bar", StringType.STRING, true));
@Test
public void testNotFilterWithEqualsExpression()
{
DeltaEqualsFilter equalsFilter = new DeltaEqualsFilter(
"name",
"Employee1"
);
DeltaNotFilter notFilter = new DeltaNotFilter(equalsFilter);
Predicate predicate = notFilter.getFilterPredicate(SCHEMA);
Assert.assertEquals(predicate.getName(), "NOT");
Assert.assertEquals(1, predicate.getChildren().size());
}
@Test
public void testNotFilterWithAndExpression()
{
DeltaAndFilter andFilter = new DeltaAndFilter(
Arrays.asList(
new DeltaEqualsFilter(
"name",
"Employee1"
),
new DeltaEqualsFilter(
"name",
"Employee2"
)
)
);
DeltaNotFilter notFilter = new DeltaNotFilter(andFilter);
Predicate predicate = notFilter.getFilterPredicate(SCHEMA);
Assert.assertEquals(predicate.getName(), "NOT");
Assert.assertEquals(1, predicate.getChildren().size());
}
@Test
public void testNotFilterWithInvalidColumn()
{
DeltaEqualsFilter equalsFilter = new DeltaEqualsFilter(
"name2",
"Employee1"
);
DeltaNotFilter notFilter = new DeltaNotFilter(equalsFilter);
MatcherAssert.assertThat(
Assert.assertThrows(DruidException.class, () -> notFilter.getFilterPredicate(SCHEMA)),
DruidExceptionMatcher.invalidInput().expectMessageIs(
StringUtils.format("column[name2] doesn't exist in schema[%s]", SCHEMA)
)
);
}
@Test
public void testNotFilterWithNoFilterPredicates()
{
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new DeltaNotFilter(null)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"Delta not filter requiers 1 filter predicate and must be non-empty. None provided."
)
);
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.filter;
import io.delta.kernel.expressions.Or;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.java.util.common.StringUtils;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
public class DeltaOrFilterTest
{
private static final StructType SCHEMA = new StructType()
.add(new StructField("name", StringType.STRING, true))
.add(new StructField("age", LongType.LONG, false))
.add(new StructField("bar", StringType.STRING, true));
@Test
public void testOrFilter()
{
DeltaOrFilter orFilter = new DeltaOrFilter(
Arrays.asList(
new DeltaEqualsFilter("name", "Employee1"),
new DeltaLessThanOrEqualsFilter("age", "8")
)
);
Predicate predicate = orFilter.getFilterPredicate(SCHEMA);
Assert.assertTrue(predicate instanceof Or);
Assert.assertEquals(2, predicate.getChildren().size());
}
@Test
public void testOrFilterWithInvalidColumn()
{
DeltaOrFilter orFilter = new DeltaOrFilter(
Arrays.asList(
new DeltaEqualsFilter("name2", "Employee1"),
new DeltaLessThanOrEqualsFilter("age", "8")
)
);
MatcherAssert.assertThat(
Assert.assertThrows(DruidException.class, () -> orFilter.getFilterPredicate(SCHEMA)),
DruidExceptionMatcher.invalidInput().expectMessageIs(
StringUtils.format("column[name2] doesn't exist in schema[%s]", SCHEMA)
)
);
}
@Test
public void testOrFilterWithNoFilterPredicates()
{
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new DeltaOrFilter(null)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"Delta or filter requires 2 filter predicates and must be non-empty. None provided."
)
);
}
@Test
public void testOrFilterWithOneFilterPredicate()
{
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new DeltaOrFilter(
Collections.singletonList(
new DeltaEqualsFilter("name", "Employee1")
)
)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"Delta or filter requires 2 filter predicates, but provided [1]."
)
);
}
}

View File

@ -32,21 +32,44 @@ import io.delta.kernel.internal.util.Utils;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.hadoop.conf.Configuration;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class DeltaInputRowTest
{
@Test
public void testDeltaInputRow() throws TableNotFoundException, IOException
public static Collection<Object[]> data()
{
Object[][] data = new Object[][]{
{NonPartitionedDeltaTable.DELTA_TABLE_PATH, NonPartitionedDeltaTable.FULL_SCHEMA, NonPartitionedDeltaTable.DIMENSIONS, NonPartitionedDeltaTable.EXPECTED_ROWS},
{PartitionedDeltaTable.DELTA_TABLE_PATH, PartitionedDeltaTable.FULL_SCHEMA, PartitionedDeltaTable.DIMENSIONS, PartitionedDeltaTable.EXPECTED_ROWS}
};
return Arrays.asList(data);
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testDeltaInputRow(
final String deltaTablePath,
final InputRowSchema schema,
final List<String> dimensions,
final List<Map<String, Object>> expectedRows
) throws TableNotFoundException, IOException
{
final TableClient tableClient = DefaultTableClient.create(new Configuration());
final Scan scan = DeltaTestUtils.getScan(tableClient);
final Scan scan = DeltaTestUtils.getScan(tableClient, deltaTablePath);
final Row scanState = scan.getScanState(tableClient);
final StructType physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState);
@ -76,13 +99,13 @@ public class DeltaInputRowTest
while (dataIter.hasNext()) {
FilteredColumnarBatch dataReadResult = dataIter.next();
Row next = dataReadResult.getRows().next();
DeltaInputRow deltaInputRow = new DeltaInputRow(next, DeltaTestUtils.FULL_SCHEMA);
DeltaInputRow deltaInputRow = new DeltaInputRow(next, schema);
Assert.assertNotNull(deltaInputRow);
Assert.assertEquals(DeltaTestUtils.DIMENSIONS, deltaInputRow.getDimensions());
Assert.assertEquals(dimensions, deltaInputRow.getDimensions());
Map<String, Object> expectedRow = DeltaTestUtils.EXPECTED_ROWS.get(totalRecordCount);
Map<String, Object> expectedRow = expectedRows.get(totalRecordCount);
for (String key : expectedRow.keySet()) {
if (DeltaTestUtils.FULL_SCHEMA.getTimestampSpec().getTimestampColumn().equals(key)) {
if (schema.getTimestampSpec().getTimestampColumn().equals(key)) {
final long expectedMillis = ((Long) expectedRow.get(key)) * 1000;
Assert.assertEquals(expectedMillis, deltaInputRow.getTimestampFromEpoch());
} else {
@ -93,6 +116,23 @@ public class DeltaInputRowTest
}
}
}
Assert.assertEquals(DeltaTestUtils.EXPECTED_ROWS.size(), totalRecordCount);
Assert.assertEquals(NonPartitionedDeltaTable.EXPECTED_ROWS.size(), totalRecordCount);
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReadNonExistentTable()
{
final DeltaInputSource deltaInputSource = new DeltaInputSource("non-existent-table", null, null);
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> deltaInputSource.reader(null, null, null)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"tablePath[non-existent-table] not found."
)
);
}
}

View File

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.input;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
import org.apache.druid.delta.common.DeltaLakeDruidModule;
import org.apache.druid.delta.filter.DeltaAndFilter;
import org.apache.druid.delta.filter.DeltaLessThanFilter;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;
public class DeltaInputSourceSerdeTest
{
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper()
.registerModules(new DeltaLakeDruidModule().getJacksonModules());
@Test
public void testDeltaInputSourceDeserializationWithNoFilter() throws JsonProcessingException
{
final String payload = "{\n"
+ " \"type\": \"delta\",\n"
+ " \"tablePath\": \"foo/bar\"\n"
+ " }";
final DeltaInputSource deltaInputSource = OBJECT_MAPPER.readValue(payload, DeltaInputSource.class);
Assert.assertEquals("foo/bar", deltaInputSource.getTablePath());
Assert.assertNull(deltaInputSource.getFilter());
}
@Test
public void testDeltaInputSourceDeserializationWithLessThanFilter() throws JsonProcessingException
{
final String payload = "{\n"
+ " \"type\": \"delta\",\n"
+ " \"tablePath\": \"foo/bar\",\n"
+ " \"filter\": {\n"
+ " \"type\": \"<\",\n"
+ " \"column\": \"age\",\n"
+ " \"value\": \"20\"\n"
+ " }\n"
+ " }";
final DeltaInputSource deltaInputSource = OBJECT_MAPPER.readValue(payload, DeltaInputSource.class);
Assert.assertEquals("foo/bar", deltaInputSource.getTablePath());
Assert.assertTrue(deltaInputSource.getFilter() instanceof DeltaLessThanFilter);
}
@Test
public void testDeltaInputSourceDeserializationWithAndFilter() throws JsonProcessingException
{
final String payload = "{\n"
+ " \"type\": \"delta\",\n"
+ " \"tablePath\": \"s3://foo/bar/baz\",\n"
+ " \"filter\": {\n"
+ " \"type\": \"and\",\n"
+ " \"filters\": [\n"
+ " {\n"
+ " \"type\": \"<=\",\n"
+ " \"column\": \"age\",\n"
+ " \"value\": \"30\"\n"
+ " },\n"
+ " {\n"
+ " \"type\": \">=\",\n"
+ " \"column\": \"name\",\n"
+ " \"value\": \"Employee4\"\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ " }";
final DeltaInputSource deltaInputSource = OBJECT_MAPPER.readValue(payload, DeltaInputSource.class);
Assert.assertEquals("s3://foo/bar/baz", deltaInputSource.getTablePath());
Assert.assertTrue(deltaInputSource.getFilter() instanceof DeltaAndFilter);
}
@Test
public void testDeltaInputSourceDeserializationWithNoTablePath()
{
final String payload = "{\n"
+ " \"type\": \"delta\",\n"
+ " \"filter\": {\n"
+ " \"type\": \"<\",\n"
+ " \"column\": \"age\",\n"
+ " \"value\": \"20\"\n"
+ " }\n"
+ " }";
final ValueInstantiationException exception = Assert.assertThrows(
ValueInstantiationException.class,
() -> OBJECT_MAPPER.readValue(payload, DeltaInputSource.class)
);
Assert.assertTrue(
exception.getCause().getMessage().contains(
"tablePath cannot be null."
)
);
}
@Test
public void testDeltaInputSourceDeserializationWithNoFilterColumn()
{
final String payload = "{\n"
+ " \"type\": \"delta\",\n"
+ " \"tablePath\": \"foo/bar\",\n"
+ " \"filter\": {\n"
+ " \"type\": \">=\",\n"
+ " \"value\": \"20\"\n"
+ " }\n"
+ " }";
final ValueInstantiationException exception = Assert.assertThrows(
ValueInstantiationException.class,
() -> OBJECT_MAPPER.readValue(payload, DeltaInputSource.class)
);
Assert.assertEquals(
"column is a required field for >= filter.",
exception.getCause().getMessage()
);
}
}

View File

@ -23,7 +23,14 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.delta.filter.DeltaAndFilter;
import org.apache.druid.delta.filter.DeltaEqualsFilter;
import org.apache.druid.delta.filter.DeltaFilter;
import org.apache.druid.delta.filter.DeltaGreaterThanFilter;
import org.apache.druid.delta.filter.DeltaGreaterThanOrEqualsFilter;
import org.apache.druid.delta.filter.DeltaLessThanOrEqualsFilter;
import org.apache.druid.delta.filter.DeltaNotFilter;
import org.apache.druid.delta.filter.DeltaOrFilter;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.java.util.common.DateTimes;
@ -32,11 +39,15 @@ import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public class DeltaInputSourceTest
@ -47,169 +58,311 @@ public class DeltaInputSourceTest
System.setProperty("user.timezone", "UTC");
}
@Test
public void testSampleDeltaTable() throws IOException
@RunWith(Parameterized.class)
public static class TablePathParameterTests
{
final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
final InputSourceReader inputSourceReader = deltaInputSource.reader(DeltaTestUtils.FULL_SCHEMA, null, null);
@Parameterized.Parameters
public static Object[][] data()
{
return new Object[][]{
{
NonPartitionedDeltaTable.DELTA_TABLE_PATH,
NonPartitionedDeltaTable.FULL_SCHEMA,
NonPartitionedDeltaTable.EXPECTED_ROWS
},
{
NonPartitionedDeltaTable.DELTA_TABLE_PATH,
NonPartitionedDeltaTable.SCHEMA_1,
NonPartitionedDeltaTable.EXPECTED_ROWS
},
{
NonPartitionedDeltaTable.DELTA_TABLE_PATH,
NonPartitionedDeltaTable.SCHEMA_2,
NonPartitionedDeltaTable.EXPECTED_ROWS
},
{
PartitionedDeltaTable.DELTA_TABLE_PATH,
PartitionedDeltaTable.FULL_SCHEMA,
PartitionedDeltaTable.EXPECTED_ROWS
}
};
}
List<InputRowListPlusRawValues> actualSampledRows = sampleAllRows(inputSourceReader);
Assert.assertEquals(DeltaTestUtils.EXPECTED_ROWS.size(), actualSampledRows.size());
@Parameterized.Parameter(0)
public String deltaTablePath;
@Parameterized.Parameter(1)
public InputRowSchema schema;
@Parameterized.Parameter(2)
public List<Map<String, Object>> expectedRows;
for (int idx = 0; idx < DeltaTestUtils.EXPECTED_ROWS.size(); idx++) {
Map<String, Object> expectedRow = DeltaTestUtils.EXPECTED_ROWS.get(idx);
InputRowListPlusRawValues actualSampledRow = actualSampledRows.get(idx);
Assert.assertNull(actualSampledRow.getParseException());
@Test
public void testSampleDeltaTable() throws IOException
{
final DeltaInputSource deltaInputSource = new DeltaInputSource(deltaTablePath, null, null);
final InputSourceReader inputSourceReader = deltaInputSource.reader(schema, null, null);
Map<String, Object> actualSampledRawVals = actualSampledRow.getRawValues();
Assert.assertNotNull(actualSampledRawVals);
Assert.assertNotNull(actualSampledRow.getRawValuesList());
Assert.assertEquals(1, actualSampledRow.getRawValuesList().size());
List<InputRowListPlusRawValues> actualSampledRows = sampleAllRows(inputSourceReader);
Assert.assertEquals(expectedRows.size(), actualSampledRows.size());
for (String key : expectedRow.keySet()) {
if (DeltaTestUtils.FULL_SCHEMA.getTimestampSpec().getTimestampColumn().equals(key)) {
final long expectedMillis = (Long) expectedRow.get(key);
Assert.assertEquals(expectedMillis, actualSampledRawVals.get(key));
} else {
Assert.assertEquals(expectedRow.get(key), actualSampledRawVals.get(key));
for (int idx = 0; idx < expectedRows.size(); idx++) {
Map<String, Object> expectedRow = expectedRows.get(idx);
InputRowListPlusRawValues actualSampledRow = actualSampledRows.get(idx);
Assert.assertNull(actualSampledRow.getParseException());
Map<String, Object> actualSampledRawVals = actualSampledRow.getRawValues();
Assert.assertNotNull(actualSampledRawVals);
Assert.assertNotNull(actualSampledRow.getRawValuesList());
Assert.assertEquals(1, actualSampledRow.getRawValuesList().size());
for (String key : expectedRow.keySet()) {
if (!schema.getColumnsFilter().apply(key)) {
Assert.assertNull(actualSampledRawVals.get(key));
} else {
if (schema.getTimestampSpec().getTimestampColumn().equals(key)) {
final long expectedMillis = (Long) expectedRow.get(key);
Assert.assertEquals(expectedMillis, actualSampledRawVals.get(key));
} else {
Assert.assertEquals(expectedRow.get(key), actualSampledRawVals.get(key));
}
}
}
}
}
}
@Test
public void testReadAllDeltaTable() throws IOException
{
final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
final InputSourceReader inputSourceReader = deltaInputSource.reader(
DeltaTestUtils.FULL_SCHEMA,
null,
null
);
final List<InputRow> actualReadRows = readAllRows(inputSourceReader);
validateRows(DeltaTestUtils.EXPECTED_ROWS, actualReadRows, DeltaTestUtils.FULL_SCHEMA);
}
@Test
public void testReadAllDeltaTableSubSchema1() throws IOException
{
final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
final InputSourceReader inputSourceReader = deltaInputSource.reader(
DeltaTestUtils.SCHEMA_1,
null,
null
);
final List<InputRow> actualReadRows = readAllRows(inputSourceReader);
validateRows(DeltaTestUtils.EXPECTED_ROWS, actualReadRows, DeltaTestUtils.SCHEMA_1);
}
@Test
public void testReadAllDeltaTableWithSubSchema2() throws IOException
{
final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
final InputSourceReader inputSourceReader = deltaInputSource.reader(
DeltaTestUtils.SCHEMA_2,
null,
null
);
final List<InputRow> actualReadRows = readAllRows(inputSourceReader);
validateRows(DeltaTestUtils.EXPECTED_ROWS, actualReadRows, DeltaTestUtils.SCHEMA_2);
}
@Test
public void testDeltaLakeWithCreateSplits()
{
final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
final List<InputSplit<DeltaSplit>> splits = deltaInputSource.createSplits(null, null)
.collect(Collectors.toList());
Assert.assertEquals(DeltaTestUtils.SPLIT_TO_EXPECTED_ROWS.size(), splits.size());
for (InputSplit<DeltaSplit> split : splits) {
final DeltaSplit deltaSplit = split.get();
final DeltaInputSource deltaInputSourceWithSplit = new DeltaInputSource(
DeltaTestUtils.DELTA_TABLE_PATH,
deltaSplit
);
List<InputSplit<DeltaSplit>> splitsResult = deltaInputSourceWithSplit.createSplits(null, null)
.collect(Collectors.toList());
Assert.assertEquals(1, splitsResult.size());
Assert.assertEquals(deltaSplit, splitsResult.get(0).get());
@Test
public void testReadDeltaTable() throws IOException
{
final DeltaInputSource deltaInputSource = new DeltaInputSource(deltaTablePath, null, null);
final InputSourceReader inputSourceReader = deltaInputSource.reader(schema, null, null);
final List<InputRow> actualReadRows = readAllRows(inputSourceReader);
validateRows(expectedRows, actualReadRows, schema);
}
}
@Test
public void testDeltaLakeWithReadSplits() throws IOException
@RunWith(Parameterized.class)
public static class FilterParameterTests
{
final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
final List<InputSplit<DeltaSplit>> splits = deltaInputSource.createSplits(null, null)
.collect(Collectors.toList());
Assert.assertEquals(DeltaTestUtils.SPLIT_TO_EXPECTED_ROWS.size(), splits.size());
@Parameterized.Parameters
public static Object[][] data()
{
return new Object[][]{
{
PartitionedDeltaTable.DELTA_TABLE_PATH,
new DeltaEqualsFilter("name", "Employee2"),
PartitionedDeltaTable.FULL_SCHEMA,
filterExpectedRows(
PartitionedDeltaTable.EXPECTED_ROWS,
row -> row.get("name").equals("Employee2")
)
},
{
PartitionedDeltaTable.DELTA_TABLE_PATH,
new DeltaGreaterThanFilter("name", "Employee3"),
PartitionedDeltaTable.FULL_SCHEMA,
filterExpectedRows(
PartitionedDeltaTable.EXPECTED_ROWS,
row -> ((String) row.get("name")).compareTo("Employee3") > 0
)
},
{
PartitionedDeltaTable.DELTA_TABLE_PATH,
new DeltaLessThanOrEqualsFilter("name", "Employee4"),
PartitionedDeltaTable.FULL_SCHEMA,
filterExpectedRows(
PartitionedDeltaTable.EXPECTED_ROWS,
row -> ((String) row.get("name")).compareTo("Employee4") <= 0
)
},
{
PartitionedDeltaTable.DELTA_TABLE_PATH,
new DeltaAndFilter(
Arrays.asList(
new DeltaEqualsFilter("name", "Employee1"),
new DeltaEqualsFilter("name", "Employee4")
)
),
PartitionedDeltaTable.FULL_SCHEMA,
filterExpectedRows(
PartitionedDeltaTable.EXPECTED_ROWS,
row -> row.get("name").equals("Employee1") && row.get("name").equals("Employee4")
)
},
{
PartitionedDeltaTable.DELTA_TABLE_PATH,
new DeltaOrFilter(
Arrays.asList(
new DeltaEqualsFilter("name", "Employee5"),
new DeltaEqualsFilter("name", "Employee1")
)
),
PartitionedDeltaTable.FULL_SCHEMA,
filterExpectedRows(
PartitionedDeltaTable.EXPECTED_ROWS,
row -> row.get("name").equals("Employee5") || row.get("name").equals("Employee1")
)
},
{
PartitionedDeltaTable.DELTA_TABLE_PATH,
new DeltaNotFilter(
new DeltaOrFilter(
Arrays.asList(
new DeltaEqualsFilter("name", "Employee5"),
new DeltaEqualsFilter("name", "Employee1")
)
)
),
PartitionedDeltaTable.FULL_SCHEMA,
filterExpectedRows(
PartitionedDeltaTable.EXPECTED_ROWS,
row -> !(row.get("name").equals("Employee5") || row.get("name").equals("Employee1"))
)
},
{
PartitionedDeltaTable.DELTA_TABLE_PATH,
new DeltaNotFilter(
new DeltaAndFilter(
Arrays.asList(
new DeltaEqualsFilter("name", "Employee1"),
new DeltaEqualsFilter("name", "Employee4")
)
)
),
PartitionedDeltaTable.FULL_SCHEMA,
filterExpectedRows(
PartitionedDeltaTable.EXPECTED_ROWS,
row -> (!(row.get("name").equals("Employee1") && row.get("name").equals("Employee4")))
)
},
{
PartitionedDeltaTable.DELTA_TABLE_PATH,
new DeltaNotFilter(
new DeltaOrFilter(
Arrays.asList(
new DeltaEqualsFilter("name", "Employee1"),
new DeltaGreaterThanOrEqualsFilter("name", "Employee4")
)
)
),
PartitionedDeltaTable.FULL_SCHEMA,
filterExpectedRows(
PartitionedDeltaTable.EXPECTED_ROWS,
row -> (!(row.get("name").equals("Employee1") || ((String) row.get("name")).compareTo("Employee4") >= 0))
)
}
};
}
for (int idx = 0; idx < splits.size(); idx++) {
final InputSplit<DeltaSplit> split = splits.get(idx);
final DeltaSplit deltaSplit = split.get();
final DeltaInputSource deltaInputSourceWithSplit = new DeltaInputSource(
DeltaTestUtils.DELTA_TABLE_PATH,
deltaSplit
);
final InputSourceReader inputSourceReader = deltaInputSourceWithSplit.reader(
DeltaTestUtils.FULL_SCHEMA,
null,
null
);
final List<InputRow> actualRowsInSplit = readAllRows(inputSourceReader);
final List<Map<String, Object>> expectedRowsInSplit = DeltaTestUtils.SPLIT_TO_EXPECTED_ROWS.get(idx);
validateRows(expectedRowsInSplit, actualRowsInSplit, DeltaTestUtils.FULL_SCHEMA);
@Parameterized.Parameter(0)
public String deltaTablePath;
@Parameterized.Parameter(1)
public DeltaFilter filter;
@Parameterized.Parameter(2)
public InputRowSchema schema;
@Parameterized.Parameter(3)
public List<Map<String, Object>> expectedRows;
@Test
public void testSampleDeltaTable() throws IOException
{
final DeltaInputSource deltaInputSource = new DeltaInputSource(deltaTablePath, null, filter);
final InputSourceReader inputSourceReader = deltaInputSource.reader(schema, null, null);
List<InputRowListPlusRawValues> actualSampledRows = sampleAllRows(inputSourceReader);
Assert.assertEquals(expectedRows.size(), actualSampledRows.size());
for (int idx = 0; idx < expectedRows.size(); idx++) {
Map<String, Object> expectedRow = expectedRows.get(idx);
InputRowListPlusRawValues actualSampledRow = actualSampledRows.get(idx);
Assert.assertNull(actualSampledRow.getParseException());
Map<String, Object> actualSampledRawVals = actualSampledRow.getRawValues();
Assert.assertNotNull(actualSampledRawVals);
Assert.assertNotNull(actualSampledRow.getRawValuesList());
Assert.assertEquals(1, actualSampledRow.getRawValuesList().size());
for (String key : expectedRow.keySet()) {
if (!schema.getColumnsFilter().apply(key)) {
Assert.assertNull(actualSampledRawVals.get(key));
} else {
if (schema.getTimestampSpec().getTimestampColumn().equals(key)) {
final long expectedMillis = (Long) expectedRow.get(key);
Assert.assertEquals(expectedMillis, actualSampledRawVals.get(key));
} else {
Assert.assertEquals(expectedRow.get(key), actualSampledRawVals.get(key));
}
}
}
}
}
private static List<Map<String, Object>> filterExpectedRows(
final List<Map<String, Object>> rows,
final Predicate<Map<String, Object>> filter
)
{
return rows.stream().filter(filter).collect(Collectors.toList());
}
@Test
public void testReadDeltaTable() throws IOException
{
final DeltaInputSource deltaInputSource = new DeltaInputSource(deltaTablePath, null, filter);
final InputSourceReader inputSourceReader = deltaInputSource.reader(schema, null, null);
final List<InputRow> actualReadRows = readAllRows(inputSourceReader);
validateRows(expectedRows, actualReadRows, schema);
}
}
@Test
public void testNullTable()
public static class InvalidInputTests
{
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new DeltaInputSource(null, null)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"tablePath cannot be null."
)
);
@Test
public void testNullTable()
{
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new DeltaInputSource(null, null, null)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"tablePath cannot be null."
)
);
}
@Test
public void testSplitNonExistentTable()
{
final DeltaInputSource deltaInputSource = new DeltaInputSource("non-existent-table", null, null);
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> deltaInputSource.createSplits(null, null)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"tablePath[non-existent-table] not found."
)
);
}
@Test
public void testReadNonExistentTable()
{
final DeltaInputSource deltaInputSource = new DeltaInputSource("non-existent-table", null, null);
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> deltaInputSource.reader(null, null, null)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"tablePath[non-existent-table] not found."
)
);
}
}
@Test
public void testSplitNonExistentTable()
{
final DeltaInputSource deltaInputSource = new DeltaInputSource("non-existent-table", null);
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> deltaInputSource.createSplits(null, null)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"tablePath[non-existent-table] not found."
)
);
}
@Test
public void testReadNonExistentTable()
{
final DeltaInputSource deltaInputSource = new DeltaInputSource("non-existent-table", null);
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> deltaInputSource.reader(null, null, null)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"tablePath[non-existent-table] not found."
)
);
}
private List<InputRowListPlusRawValues> sampleAllRows(InputSourceReader reader) throws IOException
private static List<InputRowListPlusRawValues> sampleAllRows(InputSourceReader reader) throws IOException
{
List<InputRowListPlusRawValues> rows = new ArrayList<>();
try (CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample()) {
@ -218,7 +371,7 @@ public class DeltaInputSourceTest
return rows;
}
private List<InputRow> readAllRows(InputSourceReader reader) throws IOException
private static List<InputRow> readAllRows(InputSourceReader reader) throws IOException
{
final List<InputRow> rows = new ArrayList<>();
try (CloseableIterator<InputRow> iterator = reader.read()) {
@ -227,7 +380,7 @@ public class DeltaInputSourceTest
return rows;
}
private void validateRows(
private static void validateRows(
final List<Map<String, Object>> expectedRows,
final List<InputRow> actualReadRows,
final InputRowSchema schema

View File

@ -19,9 +19,6 @@
package org.apache.druid.delta.input;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.delta.kernel.Scan;
import io.delta.kernel.ScanBuilder;
import io.delta.kernel.Snapshot;
@ -29,290 +26,12 @@ import io.delta.kernel.Table;
import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.types.StructType;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Refer to extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md to generate the
* sample Delta Lake table used in the unit tests.
*/
public class DeltaTestUtils
{
/**
* The Delta table path used by unit tests.
*/
public static final String DELTA_TABLE_PATH = "src/test/resources/employee-delta-table";
/**
* The list of dimensions in the Delta table {@link #DELTA_TABLE_PATH}.
*/
public static final List<String> DIMENSIONS = ImmutableList.of(
"id",
"birthday",
"name",
"age",
"salary",
"bonus",
"yoe",
"is_fulltime",
"last_vacation_time"
);
/**
* The expected set of rows from the first checkpoint file {@code DELTA_TABLE_PATH/_delta_log/00000000000000000000.json}
*/
private static final List<Map<String, Object>> SPLIT_0_EXPECTED_ROWS = new ArrayList<>(
ImmutableList.of(
ImmutableMap.of(
"birthday", 1057881600L,
"name", "Employee1",
"id", 867799346L,
"salary", 87642.55209817083,
"age", (short) 20,
"yoe", 4
),
ImmutableMap.of(
"birthday", 1035417600L,
"is_fulltime", false,
"name", "Employee2",
"id", 9963151889L,
"salary", 79404.63969727767,
"age", (short) 21,
"yoe", 2
),
ImmutableMap.of(
"birthday", 890179200L,
"name", "Employee3",
"id", 2766777393L,
"salary", 92418.21424435009,
"age", (short) 25,
"yoe", 9
),
ImmutableMap.of(
"birthday", 1073001600L,
"name", "Employee4",
"id", 6320361986L,
"salary", 97907.76612488469,
"age", (short) 20,
"yoe", 3
),
ImmutableMap.of(
"birthday", 823996800L,
"is_fulltime", true,
"bonus", 4982.215f,
"name", "Employee5",
"id", 7068152260L,
"salary", 79037.77202099308,
"last_vacation_time", 1706256972000L,
"age", (short) 27,
"yoe", 9
)
)
);
/**
* The expected rows from second checkpoint file {@code DELTA_TABLE_PATH/_delta_log/00000000000000000001.json}
*/
private static final List<Map<String, Object>> SPLIT_1_EXPECTED_ROWS = new ArrayList<>(
ImmutableList.of(
ImmutableMap.of(
"birthday", 937526400L,
"is_fulltime", false,
"name", "Employee1",
"id", 4693651733L,
"salary", 83845.11357786917,
"age", (short) 24,
"yoe", 3
),
ImmutableMap.of(
"birthday", 810777600L,
"is_fulltime", false,
"name", "Employee2",
"id", 7132772589L,
"salary", 90140.44051385639,
"age", (short) 28,
"yoe", 8
),
ImmutableMap.of(
"birthday", 1104969600L,
"is_fulltime", true,
"bonus", 3699.0881f,
"name", "Employee3",
"id", 6627278510L,
"salary", 58857.27649436368,
"last_vacation_time", 1706458554000L,
"age", (short) 19,
"yoe", 4
),
ImmutableMap.of(
"birthday", 763257600L,
"is_fulltime", true,
"bonus", 2334.6675f,
"name", "Employee4",
"id", 4786204912L,
"salary", 93646.81222022788,
"last_vacation_time", 1706390154000L,
"age", (short) 29,
"yoe", 5
),
ImmutableMap.of(
"birthday", 1114646400L,
"name", "Employee5",
"id", 2773939764L,
"salary", 66300.05339373322,
"age", (short) 18,
"yoe", 3
),
ImmutableMap.of(
"birthday", 913334400L,
"is_fulltime", false,
"name", "Employee6",
"id", 8333438088L,
"salary", 59219.5257906128,
"age", (short) 25,
"yoe", 4
),
ImmutableMap.of(
"birthday", 893894400L,
"is_fulltime", false,
"name", "Employee7",
"id", 8397454007L,
"salary", 61909.733851830584,
"age", (short) 25,
"yoe", 8
),
ImmutableMap.of(
"birthday", 1038873600L,
"is_fulltime", true,
"bonus", 3000.0154f,
"name", "Employee8",
"id", 8925359945L,
"salary", 76588.05471316943,
"last_vacation_time", 1706195754000L,
"age", (short) 21,
"yoe", 1
),
ImmutableMap.of(
"birthday", 989798400L,
"is_fulltime", true,
"bonus", 4463.3833f,
"name", "Employee9",
"id", 8154788551L,
"salary", 59787.98539015684,
"last_vacation_time", 1706181354000L,
"age", (short) 22,
"yoe", 4
),
ImmutableMap.of(
"birthday", 912297600L,
"is_fulltime", false,
"name", "Employee10",
"id", 5884382356L,
"salary", 51565.91965119349,
"age", (short) 25,
"yoe", 9
)
)
);
/**
* Mapping of checkpoint file identifier to the list of expected rows in that checkpoint.
*/
public static final Map<Integer, List<Map<String, Object>>> SPLIT_TO_EXPECTED_ROWS = new HashMap<>(
ImmutableMap.of(
0, SPLIT_0_EXPECTED_ROWS,
1, SPLIT_1_EXPECTED_ROWS
)
);
/**
* Complete set of expected rows across all checkpoint files for {@link #DELTA_TABLE_PATH}.
*/
public static final List<Map<String, Object>> EXPECTED_ROWS = SPLIT_TO_EXPECTED_ROWS.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
/**
* The Druid schema used for ingestion of {@link #DELTA_TABLE_PATH}.
*/
public static final InputRowSchema FULL_SCHEMA = new InputRowSchema(
new TimestampSpec("birthday", "posix", null),
new DimensionsSpec(
ImmutableList.of(
new LongDimensionSchema("id"),
new LongDimensionSchema("birthday"),
new StringDimensionSchema("name"),
new LongDimensionSchema("age"),
new DoubleDimensionSchema("salary"),
new FloatDimensionSchema("bonus"),
new LongDimensionSchema("yoe"),
new StringDimensionSchema("is_fulltime"),
new LongDimensionSchema("last_vacation_time")
)
),
ColumnsFilter.all()
);
/**
* Similar to {@link #FULL_SCHEMA}, but with a smaller set of columns with an inclusion filter applied.
*/
public static final InputRowSchema SCHEMA_1 = new InputRowSchema(
new TimestampSpec("birthday", "posix", null),
new DimensionsSpec(
ImmutableList.of(
new LongDimensionSchema("id"),
new LongDimensionSchema("birthday"),
new StringDimensionSchema("name"),
new LongDimensionSchema("age"),
new DoubleDimensionSchema("salary"),
new FloatDimensionSchema("bonus"),
new LongDimensionSchema("yoe"),
new StringDimensionSchema("is_fulltime"),
new LongDimensionSchema("last_vacation_time")
)
),
ColumnsFilter.inclusionBased(ImmutableSet.of("id", "birthday", "name", "is_fulltime"))
);
/**
* Similar to {@link #FULL_SCHEMA}, but with a smaller set of columns with an exclusion filter applied. A non-existent
* column is added to the exclusion filter - it should silently get thrown away.
*/
public static final InputRowSchema SCHEMA_2 = new InputRowSchema(
new TimestampSpec("birthday", "posix", null),
new DimensionsSpec(
ImmutableList.of(
new LongDimensionSchema("id"),
new LongDimensionSchema("birthday"),
new StringDimensionSchema("name"),
new LongDimensionSchema("age"),
new DoubleDimensionSchema("salary"),
new FloatDimensionSchema("bonus"),
new LongDimensionSchema("yoe"),
new StringDimensionSchema("is_fulltime"),
new LongDimensionSchema("last_vacation_time")
)
),
ColumnsFilter.exclusionBased(ImmutableSet.of("last_vacation_time", "bonus", "non_existent_column"))
);
/**
* A simple wrapper that builds the table scan for {@link #DELTA_TABLE_PATH} meant to be used in tests.
*/
public static Scan getScan(final TableClient tableClient) throws TableNotFoundException
public static Scan getScan(final TableClient tableClient, final String deltaTablePath) throws TableNotFoundException
{
final Table table = Table.forPath(tableClient, DELTA_TABLE_PATH);
final Table table = Table.forPath(tableClient, deltaTablePath);
final Snapshot snapshot = table.getLatestSnapshot(tableClient);
final StructType readSchema = snapshot.getSchema(tableClient);
final ScanBuilder scanBuilder = snapshot.getScanBuilder(tableClient)

View File

@ -0,0 +1,306 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.input;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Refer to extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md to generate the
* sample Delta Lake table used in the unit tests.
*
* <p>
* For a partitioned delta table sample, see {@link PartitionedDeltaTable}.
* </p>
*/
public class NonPartitionedDeltaTable
{
/**
* The non-partitioned Delta table path used by unit tests.
*/
public static final String DELTA_TABLE_PATH = "src/test/resources/employee-delta-table";
/**
* The list of dimensions in the Delta table {@link #DELTA_TABLE_PATH}.
*/
public static final List<String> DIMENSIONS = ImmutableList.of(
"id",
"birthday",
"name",
"age",
"salary",
"bonus",
"yoe",
"is_fulltime",
"last_vacation_time"
);
/**
* The expected set of rows from the first checkpoint file {@code {@link #DELTA_TABLE_PATH}/_delta_log/00000000000000000000.json}
*/
private static final List<Map<String, Object>> SPLIT_0_EXPECTED_ROWS = new ArrayList<>(
ImmutableList.of(
ImmutableMap.of(
"birthday", 1057881600L,
"name", "Employee1",
"id", 867799346L,
"salary", 87642.55209817083,
"age", (short) 20,
"yoe", 4
),
ImmutableMap.of(
"birthday", 1035417600L,
"is_fulltime", false,
"name", "Employee2",
"id", 9963151889L,
"salary", 79404.63969727767,
"age", (short) 21,
"yoe", 2
),
ImmutableMap.of(
"birthday", 890179200L,
"name", "Employee3",
"id", 2766777393L,
"salary", 92418.21424435009,
"age", (short) 25,
"yoe", 9
),
ImmutableMap.of(
"birthday", 1073001600L,
"name", "Employee4",
"id", 6320361986L,
"salary", 97907.76612488469,
"age", (short) 20,
"yoe", 3
),
ImmutableMap.of(
"birthday", 823996800L,
"is_fulltime", true,
"bonus", 4982.215f,
"name", "Employee5",
"id", 7068152260L,
"salary", 79037.77202099308,
"last_vacation_time", 1706256972000L,
"age", (short) 27,
"yoe", 9
)
)
);
/**
* The expected rows from second checkpoint file {@code DELTA_TABLE_PATH/_delta_log/00000000000000000001.json}
*/
private static final List<Map<String, Object>> SPLIT_1_EXPECTED_ROWS = new ArrayList<>(
ImmutableList.of(
ImmutableMap.of(
"birthday", 937526400L,
"is_fulltime", false,
"name", "Employee1",
"id", 4693651733L,
"salary", 83845.11357786917,
"age", (short) 24,
"yoe", 3
),
ImmutableMap.of(
"birthday", 810777600L,
"is_fulltime", false,
"name", "Employee2",
"id", 7132772589L,
"salary", 90140.44051385639,
"age", (short) 28,
"yoe", 8
),
ImmutableMap.of(
"birthday", 1104969600L,
"is_fulltime", true,
"bonus", 3699.0881f,
"name", "Employee3",
"id", 6627278510L,
"salary", 58857.27649436368,
"last_vacation_time", 1706458554000L,
"age", (short) 19,
"yoe", 4
),
ImmutableMap.of(
"birthday", 763257600L,
"is_fulltime", true,
"bonus", 2334.6675f,
"name", "Employee4",
"id", 4786204912L,
"salary", 93646.81222022788,
"last_vacation_time", 1706390154000L,
"age", (short) 29,
"yoe", 5
),
ImmutableMap.of(
"birthday", 1114646400L,
"name", "Employee5",
"id", 2773939764L,
"salary", 66300.05339373322,
"age", (short) 18,
"yoe", 3
),
ImmutableMap.of(
"birthday", 913334400L,
"is_fulltime", false,
"name", "Employee6",
"id", 8333438088L,
"salary", 59219.5257906128,
"age", (short) 25,
"yoe", 4
),
ImmutableMap.of(
"birthday", 893894400L,
"is_fulltime", false,
"name", "Employee7",
"id", 8397454007L,
"salary", 61909.733851830584,
"age", (short) 25,
"yoe", 8
),
ImmutableMap.of(
"birthday", 1038873600L,
"is_fulltime", true,
"bonus", 3000.0154f,
"name", "Employee8",
"id", 8925359945L,
"salary", 76588.05471316943,
"last_vacation_time", 1706195754000L,
"age", (short) 21,
"yoe", 1
),
ImmutableMap.of(
"birthday", 989798400L,
"is_fulltime", true,
"bonus", 4463.3833f,
"name", "Employee9",
"id", 8154788551L,
"salary", 59787.98539015684,
"last_vacation_time", 1706181354000L,
"age", (short) 22,
"yoe", 4
),
ImmutableMap.of(
"birthday", 912297600L,
"is_fulltime", false,
"name", "Employee10",
"id", 5884382356L,
"salary", 51565.91965119349,
"age", (short) 25,
"yoe", 9
)
)
);
/**
* Mapping of checkpoint file identifier to the list of expected rows in that checkpoint.
*/
public static final Map<Integer, List<Map<String, Object>>> SPLIT_TO_EXPECTED_ROWS = new HashMap<>(
ImmutableMap.of(
0, SPLIT_0_EXPECTED_ROWS,
1, SPLIT_1_EXPECTED_ROWS
)
);
/**
* Complete set of expected rows across all checkpoint files for {@link #DELTA_TABLE_PATH}.
*/
public static final List<Map<String, Object>> EXPECTED_ROWS = SPLIT_TO_EXPECTED_ROWS.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
/**
* The Druid schema used for ingestion of {@link #DELTA_TABLE_PATH}.
*/
public static final InputRowSchema FULL_SCHEMA = new InputRowSchema(
new TimestampSpec("birthday", "posix", null),
new DimensionsSpec(
ImmutableList.of(
new LongDimensionSchema("id"),
new LongDimensionSchema("birthday"),
new StringDimensionSchema("name"),
new LongDimensionSchema("age"),
new DoubleDimensionSchema("salary"),
new FloatDimensionSchema("bonus"),
new LongDimensionSchema("yoe"),
new StringDimensionSchema("is_fulltime"),
new LongDimensionSchema("last_vacation_time")
)
),
ColumnsFilter.all()
);
/**
* Similar to {@link #FULL_SCHEMA}, but with a smaller set of columns with an inclusion filter applied.
*/
public static final InputRowSchema SCHEMA_1 = new InputRowSchema(
new TimestampSpec("birthday", "posix", null),
new DimensionsSpec(
ImmutableList.of(
new LongDimensionSchema("id"),
new LongDimensionSchema("birthday"),
new StringDimensionSchema("name"),
new LongDimensionSchema("age"),
new DoubleDimensionSchema("salary"),
new FloatDimensionSchema("bonus"),
new LongDimensionSchema("yoe"),
new StringDimensionSchema("is_fulltime"),
new LongDimensionSchema("last_vacation_time")
)
),
ColumnsFilter.inclusionBased(ImmutableSet.of("id", "birthday", "name", "is_fulltime"))
);
/**
* Similar to {@link #FULL_SCHEMA}, but with a smaller set of columns with an exclusion filter applied. A non-existent
* column is added to the exclusion filter - it should silently get thrown away.
*/
public static final InputRowSchema SCHEMA_2 = new InputRowSchema(
new TimestampSpec("birthday", "posix", null),
new DimensionsSpec(
ImmutableList.of(
new LongDimensionSchema("id"),
new LongDimensionSchema("birthday"),
new StringDimensionSchema("name"),
new LongDimensionSchema("age"),
new DoubleDimensionSchema("salary"),
new FloatDimensionSchema("bonus"),
new LongDimensionSchema("yoe"),
new StringDimensionSchema("is_fulltime"),
new LongDimensionSchema("last_vacation_time")
)
),
ColumnsFilter.exclusionBased(ImmutableSet.of("last_vacation_time", "bonus", "non_existent_column"))
);
}

View File

@ -0,0 +1,258 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.delta.input;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Refer to extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md to generate the
* sample partitioned Delta Lake table used in the unit tests.
*
* <p>
* For an unpartitioned delta table sample, see {@link NonPartitionedDeltaTable}.
* </p>
*/
public class PartitionedDeltaTable
{
/**
* The Delta table path used by unit tests.
*/
public static final String DELTA_TABLE_PATH = "src/test/resources/employee-delta-table-partitioned-name";
/**
* The list of dimensions in the Delta table {@link #DELTA_TABLE_PATH}.
*/
public static final List<String> DIMENSIONS = ImmutableList.of(
"id",
"birthday",
"name",
"age",
"salary",
"bonus",
"yoe",
"is_fulltime",
"last_vacation_time"
);
/**
* The expected set of rows from the first checkpoint file {@code {@link #DELTA_TABLE_PATH}/_delta_log/00000000000000000000.json}
*/
private static final List<Map<String, Object>> SPLIT_0_EXPECTED_ROWS = new ArrayList<>(
ImmutableList.of(
ImmutableMap.of(
"birthday", 898992000L,
"name", "Employee1",
"id", 1726247710L,
"salary", 77928.75048595395,
"age", (short) 25,
"yoe", 3
),
ImmutableMap.of(
"birthday", 783475200L,
"is_fulltime", true,
"name", "Employee2",
"id", 6142474489L,
"salary", 57807.64358288189,
"age", (short) 29,
"yoe", 1
),
ImmutableMap.of(
"birthday", 989712000L,
"name", "Employee3",
"id", 3550221591L,
"salary", 58226.41814823942,
"age", (short) 22,
"yoe", 6
),
ImmutableMap.of(
"birthday", 1130025600L,
"name", "Employee4",
"id", 3822742702L,
"salary", 63581.29293955827,
"age", (short) 18,
"yoe", 2
),
ImmutableMap.of(
"birthday", 1001116800L,
"name", "Employee5",
"id", 5611620190L,
"salary", 76076.68269796186,
"age", (short) 22,
"yoe", 3
)
)
);
/**
* The expected rows from second checkpoint file {@code DELTA_TABLE_PATH/_delta_log/00000000000000000001.json}
*/
private static final List<Map<String, Object>> SPLIT_1_EXPECTED_ROWS = new ArrayList<>(
ImmutableList.of(
ImmutableMap.of(
"birthday", 1058227200L,
"is_fulltime", false,
"name", "Employee1",
"id", 74065452L,
"salary", 73109.56096784897,
"age", (short) 20,
"yoe", 3
),
ImmutableMap.of(
"birthday", 930528000L,
"is_fulltime", true,
"name", "Employee2",
"id", 7246574606L,
"salary", 54723.608212239684,
"age", (short) 24,
"yoe", 5
),
ImmutableMap.of(
"birthday", 863654400L,
"is_fulltime", true,
"bonus", 1424.9856f,
"name", "Employee3",
"id", 743868531L,
"salary", 59595.17550553535,
"last_vacation_time", 1712918081000L,
"age", (short) 26,
"yoe", 8
),
ImmutableMap.of(
"birthday", 850780800L,
"name", "Employee4",
"id", 4750981713L,
"salary", 85673.13564089558,
"age", (short) 27,
"yoe", 8
),
ImmutableMap.of(
"birthday", 986256000L,
"name", "Employee5",
"id", 2605140287L,
"salary", 56740.37076828715,
"age", (short) 23,
"yoe", 5
)
)
);
/**
* The expected rows from second checkpoint file {@code DELTA_TABLE_PATH/_delta_log/00000000000000000001.json}
*/
private static final List<Map<String, Object>> SPLIT_2_EXPECTED_ROWS = new ArrayList<>(
ImmutableList.of(
ImmutableMap.of(
"birthday", 885168000L,
"name", "Employee1",
"id", 4922151803L,
"salary", 63418.10754490299,
"age", (short) 26,
"yoe", 10
),
ImmutableMap.of(
"birthday", 806198400L,
"name", "Employee2",
"id", 9345771736L,
"salary", 58610.730719740226,
"age", (short) 28,
"yoe", 10
),
ImmutableMap.of(
"birthday", 1120435200L,
"name", "Employee3",
"id", 4740025087L,
"salary", 63256.1008903906,
"age", (short) 18,
"yoe", 1
),
ImmutableMap.of(
"birthday", 968284800L,
"is_fulltime", false,
"name", "Employee4",
"id", 655456941L,
"salary", 95552.47057273184,
"age", (short) 23,
"yoe", 1
),
ImmutableMap.of(
"birthday", 1124841600L,
"name", "Employee5",
"id", 5565370685L,
"salary", 74066.92920109774,
"age", (short) 18,
"yoe", 1
)
)
);
/**
* Mapping of checkpoint file identifier to the list of expected rows in that checkpoint.
*/
public static final Map<Integer, List<Map<String, Object>>> SPLIT_TO_EXPECTED_ROWS = new HashMap<>(
ImmutableMap.of(
0, SPLIT_0_EXPECTED_ROWS,
1, SPLIT_1_EXPECTED_ROWS,
2, SPLIT_2_EXPECTED_ROWS
)
);
/**
* Complete set of expected rows across all checkpoint files for {@link #DELTA_TABLE_PATH}.
*/
public static final List<Map<String, Object>> EXPECTED_ROWS = SPLIT_TO_EXPECTED_ROWS.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
/**
* The Druid schema used for ingestion of {@link #DELTA_TABLE_PATH}.
*/
public static final InputRowSchema FULL_SCHEMA = new InputRowSchema(
new TimestampSpec("birthday", "posix", null),
new DimensionsSpec(
ImmutableList.of(
new LongDimensionSchema("id"),
new LongDimensionSchema("birthday"),
new StringDimensionSchema("name"),
new LongDimensionSchema("age"),
new DoubleDimensionSchema("salary"),
new FloatDimensionSchema("bonus"),
new LongDimensionSchema("yoe"),
new StringDimensionSchema("is_fulltime"),
new LongDimensionSchema("last_vacation_time")
)
),
ColumnsFilter.all()
);
}

View File

@ -25,15 +25,29 @@ import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.client.DefaultTableClient;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.Collection;
public class RowSerdeTest
{
@Test
public void testSerializeDeserializeRoundtrip() throws TableNotFoundException
public static Collection<Object[]> data()
{
Object[][] data = new Object[][]{
{NonPartitionedDeltaTable.DELTA_TABLE_PATH},
{PartitionedDeltaTable.DELTA_TABLE_PATH}
};
return Arrays.asList(data);
}
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testSerializeDeserializeRoundtrip(final String tablePath) throws TableNotFoundException
{
final DefaultTableClient tableClient = DefaultTableClient.create(new Configuration());
final Scan scan = DeltaTestUtils.getScan(tableClient);
final Scan scan = DeltaTestUtils.getScan(tableClient, tablePath);
final Row scanState = scan.getScanState(tableClient);
final String rowJson = RowSerde.serializeRowToJson(scanState);
@ -41,5 +55,4 @@ public class RowSerdeTest
Assert.assertEquals(scanState.getSchema(), row.getSchema());
}
}

View File

@ -44,25 +44,43 @@ Delta table to `resources/employee-delta-table`. You can override the defaults b
```shell
python3 create_delta_table.py -h
usage: create_delta_table.py [-h] [--save_mode {append,overwrite}] [--save_path SAVE_PATH] [--num_records NUM_RECORDS]
usage: create_delta_table.py [-h] --save_path SAVE_PATH [--save_mode {append,overwrite}] [--partitioned_by {date,name}] [--num_records NUM_RECORDS]
Script to write a Delta Lake table.
optional arguments:
options:
-h, --help show this help message and exit
--save_path SAVE_PATH
Save path for Delta table (default: None)
--save_mode {append,overwrite}
Specify write mode (append/overwrite) (default: append)
--save_path SAVE_PATH
Save path for Delta table (default: <DRUID_BASE_PATH>/druid/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table)
--partitioned_by {date,name}
Partitioned by columns (default: None)
--num_records NUM_RECORDS
Specify number of Delta records to write (default: 10)
Specify number of Delta records to write (default: 5)
```
The test data in `resources/employee-delta-table` was generated by:
### Non-partitioned table `employee-delta-table`:
The test data in `resources/employee-delta-table` contains 15 Delta records generated over 2 snapshots.
The table was generated by running the following commands:
```shell
python3 create_delta_table.py
python3 create_delta_table.py --num_records=5 --save_mode=append
python3 create_delta_table.py --save_path=employee-delta-table --num_records=10
python3 create_delta_table.py --save_path=employee-delta-table
```
This creates a total of 15 Delta records across two transactional commits. The resulting Delta table is checked in
to the repo. The expectated rows `DeltaTestUtils.java` are updated accordingly.
The resulting Delta table is checked in to the repo. The expectated rows to be used in tests are updated in
`NonPartitionedDeltaTable.java` accordingly.
### Partitioned table `employee-delta-table-partitioned-name`:
The test data in `resources/employee-delta-table-partitioned-name` contains 15 Delta records generated over 3 snapshots.
This table is partitioned by the name column. The table was generated by running the following commands:
```shell
python3 create_delta_table.py --save_path=employee-delta-table-partitioned-name --partitioned_by=name
python3 create_delta_table.py --save_path=employee-delta-table-partitioned-name --partitioned_by=name
python3 create_delta_table.py --save_path=employee-delta-table-partitioned-name --partitioned_by=name
```
The resulting Delta table is checked in to the repo. The expectated rows to be used in tests are updated in
`PartitionedDeltaTable.java` accordingly.

View File

@ -18,7 +18,7 @@
import os
import argparse
import delta
from delta import *
import pyspark
from pyspark.sql.types import StructType, StructField, ShortType, StringType, TimestampType, LongType, IntegerType, DoubleType, FloatType, DateType, BooleanType
from datetime import datetime, timedelta
@ -34,7 +34,7 @@ def config_spark_with_delta_lake():
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
)
spark = delta.configure_spark_with_delta_pip(builder).getOrCreate()
spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
return spark
@ -94,28 +94,33 @@ def main():
parser = argparse.ArgumentParser(description="Script to write a Delta Lake table.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--save_mode', choices=('append', 'overwrite'), default="overwrite",
parser.add_argument('--save_path', default=None, required=True, help="Save path for Delta table")
parser.add_argument('--save_mode', choices=('append', 'overwrite'), default="append",
help="Specify write mode (append/overwrite)")
parser.add_argument('--save_path', default=os.path.join(os.getcwd(), "employee-delta-table"),
help="Save path for Delta table")
parser.add_argument('--num_records', type=int, default=10,
help="Specify number of Delta records to write")
parser.add_argument('--partitioned_by', choices=("date", "name"), default=None,
help="Column to partition the Delta table")
parser.add_argument('--num_records', type=int, default=5, help="Specify number of Delta records to write")
args = parser.parse_args()
save_mode = args.save_mode
save_path = args.save_path
num_records = args.num_records
partitioned_by = args.partitioned_by
spark = config_spark_with_delta_lake()
data, schema = create_dataset(num_records=num_records)
df = spark.createDataFrame(data, schema=schema)
df.write.format("delta").mode(save_mode).save(save_path)
if not partitioned_by:
df.write.format("delta").mode(save_mode).save(save_path)
else:
df.write.format("delta").partitionBy("name").mode(save_mode).save(save_path)
df.show()
print(f"Generated Delta records to {save_path} in {save_mode} mode with {num_records} records.")
print(f"Generated Delta table records partitioned by {partitioned_by} in {save_path} in {save_mode} mode"
f" with {num_records} records.")
if __name__ == "__main__":

View File

@ -0,0 +1,8 @@
{"commitInfo":{"timestamp":1713151902031,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"name\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"5","numOutputRows":"5","numOutputBytes":"10199"},"engineInfo":"Apache-Spark/3.5.1 Delta-Lake/3.1.0","txnId":"c79386c6-581f-4624-a5a5-b04298b173d2"}}
{"metaData":{"id":"45cfe982-fdfe-4d8c-ad30-7d0eb3acf821","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"salary\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"bonus\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"yoe\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"is_fulltime\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"last_vacation_time\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["name"],"configuration":{},"createdTime":1713151899961}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"add":{"path":"name=Employee1/part-00001-4f11e631-348f-4378-936e-34132f176203.c000.snappy.parquet","partitionValues":{"name":"Employee1"},"size":2034,"modificationTime":1713151901969,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4922151803,\"birthday\":\"1998-01-19\",\"age\":26,\"salary\":63418.10754490299,\"yoe\":10},\"maxValues\":{\"id\":4922151803,\"birthday\":\"1998-01-19\",\"age\":26,\"salary\":63418.10754490299,\"yoe\":10},\"nullCount\":{\"id\":0,\"birthday\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":1,\"last_vacation_time\":1}}"}}
{"add":{"path":"name=Employee2/part-00003-07285317-1943-4b24-8962-03543375d133.c000.snappy.parquet","partitionValues":{"name":"Employee2"},"size":2033,"modificationTime":1713151901968,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":9345771736,\"birthday\":\"1995-07-20\",\"age\":28,\"salary\":58610.730719740226,\"yoe\":10},\"maxValues\":{\"id\":9345771736,\"birthday\":\"1995-07-20\",\"age\":28,\"salary\":58610.730719740226,\"yoe\":10},\"nullCount\":{\"id\":0,\"birthday\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":1,\"last_vacation_time\":1}}"}}
{"add":{"path":"name=Employee3/part-00005-ac0ede62-3abc-47a3-9eac-c09a3802cd78.c000.snappy.parquet","partitionValues":{"name":"Employee3"},"size":2034,"modificationTime":1713151901968,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4740025087,\"birthday\":\"2005-07-04\",\"age\":18,\"salary\":63256.1008903906,\"yoe\":1},\"maxValues\":{\"id\":4740025087,\"birthday\":\"2005-07-04\",\"age\":18,\"salary\":63256.1008903906,\"yoe\":1},\"nullCount\":{\"id\":0,\"birthday\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":1,\"last_vacation_time\":1}}"}}
{"add":{"path":"name=Employee4/part-00007-45c2fd36-d1e1-4e92-b21c-84d385a8218a.c000.snappy.parquet","partitionValues":{"name":"Employee4"},"size":2049,"modificationTime":1713151901969,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":655456941,\"birthday\":\"2000-09-07\",\"age\":23,\"salary\":95552.47057273184,\"yoe\":1},\"maxValues\":{\"id\":655456941,\"birthday\":\"2000-09-07\",\"age\":23,\"salary\":95552.47057273184,\"yoe\":1},\"nullCount\":{\"id\":0,\"birthday\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":1}}"}}
{"add":{"path":"name=Employee5/part-00009-079ed08f-dd8d-434f-a816-c73420234b25.c000.snappy.parquet","partitionValues":{"name":"Employee5"},"size":2049,"modificationTime":1713151901969,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":5565370685,\"birthday\":\"2005-08-24\",\"age\":18,\"salary\":74066.92920109774,\"yoe\":1},\"maxValues\":{\"id\":5565370685,\"birthday\":\"2005-08-24\",\"age\":18,\"salary\":74066.92920109774,\"yoe\":1},\"nullCount\":{\"id\":0,\"birthday\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":1}}"}}

View File

@ -0,0 +1,6 @@
{"commitInfo":{"timestamp":1713152087613,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"name\"]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"5","numOutputRows":"5","numOutputBytes":"10643"},"engineInfo":"Apache-Spark/3.5.1 Delta-Lake/3.1.0","txnId":"b1dfdd19-fd45-40e0-bda3-c19beb391488"}}
{"add":{"path":"name=Employee1/part-00001-1b911f24-6d69-4065-9c4e-d5fa896dcefe.c000.snappy.parquet","partitionValues":{"name":"Employee1"},"size":2049,"modificationTime":1713152085839,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":74065452,\"birthday\":\"2003-07-15\",\"age\":20,\"salary\":73109.56096784897,\"yoe\":3},\"maxValues\":{\"id\":74065452,\"birthday\":\"2003-07-15\",\"age\":20,\"salary\":73109.56096784897,\"yoe\":3},\"nullCount\":{\"id\":0,\"birthday\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":1}}"}}
{"add":{"path":"name=Employee2/part-00003-090fd396-1c53-4794-97b3-faa0f302984a.c000.snappy.parquet","partitionValues":{"name":"Employee2"},"size":2187,"modificationTime":1713152085839,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":7246574606,\"birthday\":\"1999-06-28\",\"age\":24,\"salary\":54723.608212239684,\"bonus\":1260.9291,\"yoe\":5,\"last_vacation_time\":\"2024-04-13T06:34:41.385-07:00\"},\"maxValues\":{\"id\":7246574606,\"birthday\":\"1999-06-28\",\"age\":24,\"salary\":54723.608212239684,\"bonus\":1260.9291,\"yoe\":5,\"last_vacation_time\":\"2024-04-13T06:34:41.385-07:00\"},\"nullCount\":{\"id\":0,\"birthday\":0,\"age\":0,\"salary\":0,\"bonus\":0,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":0}}"}}
{"add":{"path":"name=Employee3/part-00005-32e5492c-7ebf-407e-8ecf-03add4ee14b8.c000.snappy.parquet","partitionValues":{"name":"Employee3"},"size":2187,"modificationTime":1713152085839,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":743868531,\"birthday\":\"1997-05-15\",\"age\":26,\"salary\":59595.17550553535,\"bonus\":1424.9856,\"yoe\":8,\"last_vacation_time\":\"2024-04-12T03:34:41.385-07:00\"},\"maxValues\":{\"id\":743868531,\"birthday\":\"1997-05-15\",\"age\":26,\"salary\":59595.17550553535,\"bonus\":1424.9856,\"yoe\":8,\"last_vacation_time\":\"2024-04-12T03:34:41.385-07:00\"},\"nullCount\":{\"id\":0,\"birthday\":0,\"age\":0,\"salary\":0,\"bonus\":0,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":0}}"}}
{"add":{"path":"name=Employee4/part-00007-d88803d4-2bb0-4c31-8340-58cb6d797963.c000.snappy.parquet","partitionValues":{"name":"Employee4"},"size":2033,"modificationTime":1713152085839,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4750981713,\"birthday\":\"1996-12-17\",\"age\":27,\"salary\":85673.13564089558,\"yoe\":8},\"maxValues\":{\"id\":4750981713,\"birthday\":\"1996-12-17\",\"age\":27,\"salary\":85673.13564089558,\"yoe\":8},\"nullCount\":{\"id\":0,\"birthday\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":1,\"last_vacation_time\":1}}"}}
{"add":{"path":"name=Employee5/part-00009-f87803c3-6cfd-4a37-9283-f2bff0c0dfad.c000.snappy.parquet","partitionValues":{"name":"Employee5"},"size":2187,"modificationTime":1713152085839,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2605140287,\"birthday\":\"2001-04-03\",\"age\":23,\"salary\":56740.37076828715,\"bonus\":3912.511,\"yoe\":5,\"last_vacation_time\":\"2024-04-13T14:34:41.385-07:00\"},\"maxValues\":{\"id\":2605140287,\"birthday\":\"2001-04-03\",\"age\":23,\"salary\":56740.37076828715,\"bonus\":3912.511,\"yoe\":5,\"last_vacation_time\":\"2024-04-13T14:34:41.385-07:00\"},\"nullCount\":{\"id\":0,\"birthday\":0,\"age\":0,\"salary\":0,\"bonus\":0,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":0}}"}}

View File

@ -0,0 +1,6 @@
{"commitInfo":{"timestamp":1713152124948,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"name\"]"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"5","numOutputRows":"5","numOutputBytes":"10505"},"engineInfo":"Apache-Spark/3.5.1 Delta-Lake/3.1.0","txnId":"87a05abb-fae3-47c4-af4a-b185e23004c5"}}
{"add":{"path":"name=Employee1/part-00001-615707f3-eb13-47ef-ac1a-b8decc09e05a.c000.snappy.parquet","partitionValues":{"name":"Employee1"},"size":2187,"modificationTime":1713152123251,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1726247710,\"birthday\":\"1998-06-28\",\"age\":25,\"salary\":77928.75048595395,\"bonus\":4976.98,\"yoe\":3,\"last_vacation_time\":\"2024-04-13T22:35:19.168-07:00\"},\"maxValues\":{\"id\":1726247710,\"birthday\":\"1998-06-28\",\"age\":25,\"salary\":77928.75048595395,\"bonus\":4976.98,\"yoe\":3,\"last_vacation_time\":\"2024-04-13T22:35:19.168-07:00\"},\"nullCount\":{\"id\":0,\"birthday\":0,\"age\":0,\"salary\":0,\"bonus\":0,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":0}}"}}
{"add":{"path":"name=Employee2/part-00003-62ce8217-f361-4b70-91ec-9f398300c083.c000.snappy.parquet","partitionValues":{"name":"Employee2"},"size":2186,"modificationTime":1713152123251,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":6142474489,\"birthday\":\"1994-10-30\",\"age\":29,\"salary\":57807.64358288189,\"bonus\":3662.5002,\"yoe\":1,\"last_vacation_time\":\"2024-04-14T16:35:19.168-07:00\"},\"maxValues\":{\"id\":6142474489,\"birthday\":\"1994-10-30\",\"age\":29,\"salary\":57807.64358288189,\"bonus\":3662.5002,\"yoe\":1,\"last_vacation_time\":\"2024-04-14T16:35:19.168-07:00\"},\"nullCount\":{\"id\":0,\"birthday\":0,\"age\":0,\"salary\":0,\"bonus\":0,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":0}}"}}
{"add":{"path":"name=Employee3/part-00005-c33dc31c-d3a8-4a50-90d3-96f00b1b2e22.c000.snappy.parquet","partitionValues":{"name":"Employee3"},"size":2049,"modificationTime":1713152123251,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":3550221591,\"birthday\":\"2001-05-13\",\"age\":22,\"salary\":58226.41814823942,\"yoe\":6},\"maxValues\":{\"id\":3550221591,\"birthday\":\"2001-05-13\",\"age\":22,\"salary\":58226.41814823942,\"yoe\":6},\"nullCount\":{\"id\":0,\"birthday\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":1}}"}}
{"add":{"path":"name=Employee4/part-00007-b6e49fa4-cb41-4bd1-8dd2-1ed5e561f801.c000.snappy.parquet","partitionValues":{"name":"Employee4"},"size":2049,"modificationTime":1713152123251,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":3822742702,\"birthday\":\"2005-10-23\",\"age\":18,\"salary\":63581.29293955827,\"yoe\":2},\"maxValues\":{\"id\":3822742702,\"birthday\":\"2005-10-23\",\"age\":18,\"salary\":63581.29293955827,\"yoe\":2},\"nullCount\":{\"id\":0,\"birthday\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":0,\"last_vacation_time\":1}}"}}
{"add":{"path":"name=Employee5/part-00009-b8de3a44-b0e9-4d68-89ee-195b76453643.c000.snappy.parquet","partitionValues":{"name":"Employee5"},"size":2034,"modificationTime":1713152123251,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":5611620190,\"birthday\":\"2001-09-22\",\"age\":22,\"salary\":76076.68269796186,\"yoe\":3},\"maxValues\":{\"id\":5611620190,\"birthday\":\"2001-09-22\",\"age\":22,\"salary\":76076.68269796186,\"yoe\":3},\"nullCount\":{\"id\":0,\"birthday\":0,\"age\":0,\"salary\":0,\"bonus\":1,\"yoe\":0,\"is_fulltime\":1,\"last_vacation_time\":1}}"}}

View File

@ -1,2 +1,3 @@
deltalake==0.16.3
delta-spark==3.1.0
pyspark==3.5.0
pyspark==3.5.0