Add range filtering support for iceberg ingestion (#15782)

* Add range filtering support for iceberg ingestion

* Docs formatting

* Spelling
This commit is contained in:
Atul Mohan 2024-02-01 23:32:30 -08:00 committed by GitHub
parent 223f29d64c
commit 2e46a98024
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 176 additions and 5 deletions

View File

@ -1121,6 +1121,17 @@ This input source provides the following filters: `and`, `equals`, `interval`, a
|type|Set this value to `not`.|yes| |type|Set this value to `not`.|yes|
|filter|The iceberg filter on which logical NOT is applied|yes| |filter|The iceberg filter on which logical NOT is applied|yes|
`range` Filter:
|Property|Description|Default|Required|
|--------|-----------|-------|--------|
|type|Set this value to `range`.|None|yes|
|filterColumn|The column name from the iceberg table schema based on which range filtering needs to happen.|None|yes|
|lower|Lower bound value to match.|None|no. At least one of `lower` or `upper` must not be null.|
|upper|Upper bound value to match. |None|no. At least one of `lower` or `upper` must not be null.|
|lowerOpen|Boolean indicating if lower bound is open in the interval of values defined by the range (">" instead of ">="). |false|no|
|upperOpen|Boolean indicating if upper bound is open on the interval of values defined by range ("<" instead of "<="). |false|no|
## Delta Lake input source ## Delta Lake input source
:::info :::info

View File

@ -206,10 +206,6 @@
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId> <artifactId>slf4j-reload4j</artifactId>
</exclusion> </exclusion>
<exclusion>
<groupId>com.google.re2j</groupId>
<artifactId>re2j</artifactId>
</exclusion>
<exclusion> <exclusion>
<groupId>com.google.code.gson</groupId> <groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId> <artifactId>gson</artifactId>

View File

@ -33,7 +33,8 @@ import org.apache.iceberg.expressions.Expression;
@JsonSubTypes.Type(name = "equals", value = IcebergEqualsFilter.class), @JsonSubTypes.Type(name = "equals", value = IcebergEqualsFilter.class),
@JsonSubTypes.Type(name = "and", value = IcebergAndFilter.class), @JsonSubTypes.Type(name = "and", value = IcebergAndFilter.class),
@JsonSubTypes.Type(name = "not", value = IcebergNotFilter.class), @JsonSubTypes.Type(name = "not", value = IcebergNotFilter.class),
@JsonSubTypes.Type(name = "or", value = IcebergOrFilter.class) @JsonSubTypes.Type(name = "or", value = IcebergOrFilter.class),
@JsonSubTypes.Type(name = "range", value = IcebergRangeFilter.class)
}) })
public interface IcebergFilter public interface IcebergFilter
{ {

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
public class IcebergRangeFilter implements IcebergFilter
{
@JsonProperty
private final String filterColumn;
@JsonProperty
private final Boolean lowerOpen;
@JsonProperty
private final Boolean upperOpen;
@JsonProperty
private final Object lower;
@JsonProperty
private final Object upper;
@JsonCreator
public IcebergRangeFilter(
@JsonProperty("filterColumn") String filterColumn,
@JsonProperty("lower") @Nullable Object lower,
@JsonProperty("upper") @Nullable Object upper,
@JsonProperty("lowerOpen") @Nullable Boolean lowerOpen,
@JsonProperty("upperOpen") @Nullable Boolean upperOpen
)
{
Preconditions.checkNotNull(filterColumn, "You must specify a filter column on the range filter");
Preconditions.checkArgument(lower != null || upper != null, "Both lower and upper bounds cannot be empty");
this.filterColumn = filterColumn;
this.lowerOpen = lowerOpen != null ? lowerOpen : false;
this.upperOpen = upperOpen != null ? upperOpen : false;
this.lower = lower;
this.upper = upper;
}
@Override
public TableScan filter(TableScan tableScan)
{
return tableScan.filter(getFilterExpression());
}
@Override
public Expression getFilterExpression()
{
List<Expression> expressions = new ArrayList<>();
if (lower != null) {
Expression lowerExp = lowerOpen
? Expressions.greaterThan(filterColumn, lower)
: Expressions.greaterThanOrEqual(filterColumn, lower);
expressions.add(lowerExp);
}
if (upper != null) {
Expression upperExp = upperOpen
? Expressions.lessThan(filterColumn, upper)
: Expressions.lessThanOrEqual(filterColumn, upper);
expressions.add(upperExp);
}
if (expressions.size() == 2) {
return Expressions.and(expressions.get(0), expressions.get(1));
}
return expressions.get(0);
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.iceberg.filter;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.junit.Assert;
import org.junit.Test;
public class IcebergRangeFilterTest
{
private final String TEST_COLUMN = "column1";
@Test
public void testUpperOpenFilter()
{
Expression expectedExpression = Expressions.and(
Expressions.greaterThanOrEqual(TEST_COLUMN, 45),
Expressions.lessThan(TEST_COLUMN, 50)
);
IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, 45, 50, false, true);
Assert.assertEquals(expectedExpression.toString(), rangeFilter.getFilterExpression().toString());
}
@Test
public void testLowerOpenFilter()
{
Expression expectedExpression = Expressions.and(
Expressions.greaterThan(TEST_COLUMN, 45),
Expressions.lessThanOrEqual(TEST_COLUMN, 50)
);
IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, 45, 50, true, false);
Assert.assertEquals(expectedExpression.toString(), rangeFilter.getFilterExpression().toString());
}
@Test
public void testNoLowerFilter()
{
Expression expectedExpression = Expressions.lessThanOrEqual(TEST_COLUMN, 50);
IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, null, 50, null, false);
Assert.assertEquals(expectedExpression.toString(), rangeFilter.getFilterExpression().toString());
}
@Test
public void testNoUpperFilter()
{
Expression expectedExpression = Expressions.greaterThanOrEqual(TEST_COLUMN, 100);
IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, 100, null, null, null);
Assert.assertEquals(expectedExpression.toString(), rangeFilter.getFilterExpression().toString());
}
}

View File

@ -1307,9 +1307,11 @@ kafka.topic
keyColumnName keyColumnName
keyFormat keyFormat
listDelimiter listDelimiter
lowerOpen
timestamp timestamp
timestampColumnName timestampColumnName
timestampSpec timestampSpec
upperOpen
urls urls
valueFormat valueFormat
1GB 1GB