diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md index eab8b75d8d3..ee5c42cb2e5 100644 --- a/docs/ingestion/input-sources.md +++ b/docs/ingestion/input-sources.md @@ -1121,6 +1121,17 @@ This input source provides the following filters: `and`, `equals`, `interval`, a |type|Set this value to `not`.|yes| |filter|The iceberg filter on which logical NOT is applied|yes| +`range` Filter: + +|Property|Description|Default|Required| +|--------|-----------|-------|--------| +|type|Set this value to `range`.|None|yes| +|filterColumn|The column name from the iceberg table schema based on which range filtering needs to happen.|None|yes| +|lower|Lower bound value to match.|None|no. At least one of `lower` or `upper` must not be null.| +|upper|Upper bound value to match. |None|no. At least one of `lower` or `upper` must not be null.| +|lowerOpen|Boolean indicating if lower bound is open in the interval of values defined by the range (">" instead of ">="). |false|no| +|upperOpen|Boolean indicating if upper bound is open on the interval of values defined by range ("<" instead of "<="). |false|no| + ## Delta Lake input source :::info diff --git a/extensions-contrib/druid-iceberg-extensions/pom.xml b/extensions-contrib/druid-iceberg-extensions/pom.xml index 5b1bf649514..67e318afa3f 100644 --- a/extensions-contrib/druid-iceberg-extensions/pom.xml +++ b/extensions-contrib/druid-iceberg-extensions/pom.xml @@ -206,10 +206,6 @@ org.slf4j slf4j-reload4j - - com.google.re2j - re2j - com.google.code.gson gson diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java index 10c07cdbe24..cff8797d60b 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java @@ -33,7 +33,8 @@ import org.apache.iceberg.expressions.Expression; @JsonSubTypes.Type(name = "equals", value = IcebergEqualsFilter.class), @JsonSubTypes.Type(name = "and", value = IcebergAndFilter.class), @JsonSubTypes.Type(name = "not", value = IcebergNotFilter.class), - @JsonSubTypes.Type(name = "or", value = IcebergOrFilter.class) + @JsonSubTypes.Type(name = "or", value = IcebergOrFilter.class), + @JsonSubTypes.Type(name = "range", value = IcebergRangeFilter.class) }) public interface IcebergFilter { diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergRangeFilter.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergRangeFilter.java new file mode 100644 index 00000000000..2e3f42dbba9 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergRangeFilter.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.filter; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; + +public class IcebergRangeFilter implements IcebergFilter +{ + @JsonProperty + private final String filterColumn; + @JsonProperty + private final Boolean lowerOpen; + @JsonProperty + private final Boolean upperOpen; + @JsonProperty + private final Object lower; + @JsonProperty + private final Object upper; + + @JsonCreator + public IcebergRangeFilter( + @JsonProperty("filterColumn") String filterColumn, + @JsonProperty("lower") @Nullable Object lower, + @JsonProperty("upper") @Nullable Object upper, + @JsonProperty("lowerOpen") @Nullable Boolean lowerOpen, + @JsonProperty("upperOpen") @Nullable Boolean upperOpen + ) + { + Preconditions.checkNotNull(filterColumn, "You must specify a filter column on the range filter"); + Preconditions.checkArgument(lower != null || upper != null, "Both lower and upper bounds cannot be empty"); + this.filterColumn = filterColumn; + this.lowerOpen = lowerOpen != null ? lowerOpen : false; + this.upperOpen = upperOpen != null ? upperOpen : false; + this.lower = lower; + this.upper = upper; + } + + + @Override + public TableScan filter(TableScan tableScan) + { + return tableScan.filter(getFilterExpression()); + } + + @Override + public Expression getFilterExpression() + { + List expressions = new ArrayList<>(); + + if (lower != null) { + Expression lowerExp = lowerOpen + ? Expressions.greaterThan(filterColumn, lower) + : Expressions.greaterThanOrEqual(filterColumn, lower); + expressions.add(lowerExp); + } + if (upper != null) { + Expression upperExp = upperOpen + ? Expressions.lessThan(filterColumn, upper) + : Expressions.lessThanOrEqual(filterColumn, upper); + expressions.add(upperExp); + } + if (expressions.size() == 2) { + return Expressions.and(expressions.get(0), expressions.get(1)); + } + return expressions.get(0); + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergRangeFilterTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergRangeFilterTest.java new file mode 100644 index 00000000000..c85ef7c8feb --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergRangeFilterTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.filter; + +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.junit.Assert; +import org.junit.Test; + +public class IcebergRangeFilterTest +{ + private final String TEST_COLUMN = "column1"; + + @Test + public void testUpperOpenFilter() + { + Expression expectedExpression = Expressions.and( + Expressions.greaterThanOrEqual(TEST_COLUMN, 45), + Expressions.lessThan(TEST_COLUMN, 50) + ); + IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, 45, 50, false, true); + Assert.assertEquals(expectedExpression.toString(), rangeFilter.getFilterExpression().toString()); + } + + @Test + public void testLowerOpenFilter() + { + Expression expectedExpression = Expressions.and( + Expressions.greaterThan(TEST_COLUMN, 45), + Expressions.lessThanOrEqual(TEST_COLUMN, 50) + ); + IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, 45, 50, true, false); + Assert.assertEquals(expectedExpression.toString(), rangeFilter.getFilterExpression().toString()); + } + + @Test + public void testNoLowerFilter() + { + Expression expectedExpression = Expressions.lessThanOrEqual(TEST_COLUMN, 50); + IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, null, 50, null, false); + Assert.assertEquals(expectedExpression.toString(), rangeFilter.getFilterExpression().toString()); + } + + @Test + public void testNoUpperFilter() + { + Expression expectedExpression = Expressions.greaterThanOrEqual(TEST_COLUMN, 100); + IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, 100, null, null, null); + Assert.assertEquals(expectedExpression.toString(), rangeFilter.getFilterExpression().toString()); + } +} diff --git a/website/.spelling b/website/.spelling index 0d4aeaf09a8..037e6b50bc1 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1307,9 +1307,11 @@ kafka.topic keyColumnName keyFormat listDelimiter +lowerOpen timestamp timestampColumnName timestampSpec +upperOpen urls valueFormat 1GB