Window Function offset correction for RAC (#16718)

* When an ArrayList RAC creates a child RAC, the start and end offsets need to have the offset of parent's start offset
* Defaults the 2nd window bound to CURRENT ROW when only a single bound is specified
* Removes the windowingStrictValidation warning and throws a hard exception when Order By alongside RANGE clause is not provided with UNBOUNDED or CURRENT ROW as both bounds
This commit is contained in:
Sree Charan Manamala 2024-07-15 16:13:27 +05:30 committed by GitHub
parent 64104533ac
commit 78a4a09d01
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 158 additions and 68 deletions

View File

@ -712,8 +712,7 @@ public class VarianceSqlAggregatorTest extends BaseCalciteQueryTest
+ "group by dim4, dim5, mod(m1, 3)")
.queryContext(ImmutableMap.of(
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.ENABLE_DEBUG, true,
QueryContexts.WINDOWING_STRICT_VALIDATION, false
QueryContexts.ENABLE_DEBUG, true
))
.expectedResults(ImmutableList.of(
new Object[]{"a", "aa", 1.0D, 0.0D},

View File

@ -613,14 +613,6 @@ public class QueryContext
);
}
public boolean isWindowingStrictValidation()
{
return getBoolean(
QueryContexts.WINDOWING_STRICT_VALIDATION,
QueryContexts.DEFAULT_WINDOWING_STRICT_VALIDATION
);
}
public boolean isCatalogValidationEnabled()
{
return getBoolean(

View File

@ -87,7 +87,6 @@ public class QueryContexts
public static final String SERIALIZE_DATE_TIME_AS_LONG_INNER_KEY = "serializeDateTimeAsLongInner";
public static final String UNCOVERED_INTERVALS_LIMIT_KEY = "uncoveredIntervalsLimit";
public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold";
public static final String WINDOWING_STRICT_VALIDATION = "windowingStrictValidation";
public static final String CATALOG_VALIDATION_ENABLED = "catalogValidationEnabled";
// Unique identifier for the query, that is used to map the global shared resources (specifically merge buffers) to the
// query's runtime
@ -126,7 +125,6 @@ public class QueryContexts
public static final int DEFAULT_IN_FUNCTION_THRESHOLD = 100;
public static final int DEFAULT_IN_FUNCTION_EXPR_THRESHOLD = 2;
public static final boolean DEFAULT_ENABLE_TIME_BOUNDARY_PLANNING = false;
public static final boolean DEFAULT_WINDOWING_STRICT_VALIDATION = true;
public static final boolean DEFAULT_CATALOG_VALIDATION_ENABLED = true;
@SuppressWarnings("unused") // Used by Jackson serialization

View File

@ -259,8 +259,8 @@ public class ArrayListRowsAndColumns<RowType> implements AppendableRowsAndColumn
rowSignature,
extraColumns,
columnNames,
startOffset,
endOffset
this.startOffset + startOffset,
this.startOffset + endOffset
);
}

View File

@ -151,15 +151,6 @@ public class QueryContextsTest
);
}
@Test
public void testDefaultWindowingStrictValidation()
{
Assert.assertEquals(
QueryContexts.DEFAULT_WINDOWING_STRICT_VALIDATION,
QueryContext.empty().isWindowingStrictValidation()
);
}
@Test
public void testCatalogValidationEnabled()
{

View File

@ -19,12 +19,18 @@
package org.apache.druid.query.rowsandcols;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.segment.column.RowSignature;
import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Objects;
import java.util.function.Function;
public class ArrayListRowsAndColumnsTest extends RowsAndColumnsTestBase
@ -35,7 +41,10 @@ public class ArrayListRowsAndColumnsTest extends RowsAndColumnsTestBase
}
@Nonnull
public static Function<MapOfColumnsRowsAndColumns, ArrayListRowsAndColumns<Object[]>> MAKER = input -> {
public static Function<MapOfColumnsRowsAndColumns, ArrayListRowsAndColumns<Object[]>> MAKER = input -> buildRAC(input);
public static ArrayListRowsAndColumns<Object[]> buildRAC(MapOfColumnsRowsAndColumns input)
{
ArrayList<Object[]> rows = new ArrayList<>(input.numRows());
ArrayList<String> cols = new ArrayList<>(input.getColumnNames());
@ -47,7 +56,7 @@ public class ArrayListRowsAndColumnsTest extends RowsAndColumnsTestBase
for (int colIndex = 0; colIndex < cols.size(); ++colIndex) {
String col = cols.get(colIndex);
final ColumnAccessor column = input.findColumn(col).toAccessor();
final ColumnAccessor column = Objects.requireNonNull(input.findColumn(col)).toAccessor();
sigBob.add(col, column.getType());
for (int i = 0; i < column.numRows(); ++i) {
@ -66,5 +75,29 @@ public class ArrayListRowsAndColumnsTest extends RowsAndColumnsTestBase
},
sigBob.build()
);
};
}
@Test
public void testChildRAC()
{
MapOfColumnsRowsAndColumns input = MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"colA", new IntArrayColumn(new int[]{1, 1, 1, 1, 2, 2, 2, 2, 2, 2}),
"colB", new IntArrayColumn(new int[]{3, 3, 4, 4, 5, 5, 5, 6, 6, 7})
)
);
ArrayListRowsAndColumns rac = ArrayListRowsAndColumnsTest.buildRAC(input);
ArrayList<RowsAndColumns> childRACs = rac.toClusteredGroupPartitioner()
.partitionOnBoundaries(Collections.singletonList("colA"));
Assert.assertEquals(2, childRACs.size());
ArrayListRowsAndColumns childRAC = (ArrayListRowsAndColumns) childRACs.get(1);
ArrayListRowsAndColumns curChildRAC = (ArrayListRowsAndColumns) childRAC.toClusteredGroupPartitioner()
.partitionOnBoundaries(Collections.singletonList(
"colB"))
.get(0);
Assert.assertEquals(5, curChildRAC.findColumn("colB").toAccessor().getInt(0));
}
}

View File

@ -61,7 +61,6 @@ import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.column.ValueType;
@ -123,7 +122,6 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
throw Util.unexpected(windowOrId.getKind());
}
@Nullable
SqlNode lowerBound = targetWindow.getLowerBound();
@Nullable
@ -135,6 +133,17 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
);
}
if (lowerBound != null && upperBound == null) {
if (lowerBound.getKind() == SqlKind.FOLLOWING || SqlWindow.isUnboundedFollowing(lowerBound)) {
upperBound = lowerBound;
lowerBound = SqlWindow.createCurrentRow(SqlParserPos.ZERO);
} else {
upperBound = SqlWindow.createCurrentRow(SqlParserPos.ZERO);
}
targetWindow.setLowerBound(lowerBound);
targetWindow.setUpperBound(upperBound);
}
boolean hasBounds = lowerBound != null || upperBound != null;
if (call.getKind() == SqlKind.NTILE && hasBounds) {
throw buildCalciteContextException(
@ -152,18 +161,13 @@ public class DruidSqlValidator extends BaseDruidSqlValidator
}
}
if (plannerContext.queryContext().isWindowingStrictValidation()) {
if (!targetWindow.isRows() &&
(!isUnboundedOrCurrent(lowerBound) || !isUnboundedOrCurrent(upperBound))) {
// this limitation can be lifted when https://github.com/apache/druid/issues/15767 is addressed
throw buildCalciteContextException(
StringUtils.format(
"The query contains a window frame which may return incorrect results. To disregard this warning, set [%s] to false in the query context.",
QueryContexts.WINDOWING_STRICT_VALIDATION
),
windowOrId
);
}
if (!targetWindow.isRows() &&
(!isUnboundedOrCurrent(lowerBound) || !isUnboundedOrCurrent(upperBound))) {
// this limitation can be lifted when https://github.com/apache/druid/issues/15767 is addressed
throw buildCalciteContextException(
"Order By with RANGE clause currently supports only UNBOUNDED or CURRENT ROW. Use ROWS clause instead.",
windowOrId
);
}
super.validateWindow(windowOrId, scope, call);

View File

@ -15546,7 +15546,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql("SELECT dim1,ROW_NUMBER() OVER (ORDER BY dim1 RANGE BETWEEN 3 PRECEDING AND 2 FOLLOWING) from druid.foo")
.run());
assertThat(e, invalidSqlIs("The query contains a window frame which may return incorrect results. To disregard this warning, set [windowingStrictValidation] to false in the query context. (line [1], column [31])"));
assertThat(e, invalidSqlIs("Order By with RANGE clause currently supports only UNBOUNDED or CURRENT ROW. Use ROWS clause instead. (line [1], column [31])"));
}
@Test

View File

@ -201,8 +201,7 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
.sql(testCase.getSql())
.queryContext(ImmutableMap.of(
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.ENABLE_DEBUG, true,
QueryContexts.WINDOWING_STRICT_VALIDATION, false
QueryContexts.ENABLE_DEBUG, true
))
.addCustomVerification(QueryVerification.ofResults(testCase))
.run();
@ -224,8 +223,7 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
.sql(testCase.getSql())
.queryContext(ImmutableMap.of(QueryContexts.ENABLE_DEBUG, true,
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000",
QueryContexts.WINDOWING_STRICT_VALIDATION, false
QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000"
)
)
.addCustomVerification(QueryVerification.ofResults(testCase))
@ -246,8 +244,7 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
)
.queryContext(ImmutableMap.of(
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.ENABLE_DEBUG, true,
QueryContexts.WINDOWING_STRICT_VALIDATION, false
QueryContexts.ENABLE_DEBUG, true
))
.expectedResults(ImmutableList.of(
new Object[]{1L},
@ -269,8 +266,7 @@ public class CalciteWindowQueryTest extends BaseCalciteQueryTest
"FROM \"wikipedia\"")
.queryContext(ImmutableMap.of(
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.ENABLE_DEBUG, true,
QueryContexts.WINDOWING_STRICT_VALIDATION, false
QueryContexts.ENABLE_DEBUG, true
))
.expectedResults(ImmutableList.of(
new Object[]{1L, 1L}

View File

@ -0,0 +1,82 @@
type: "operatorValidation"
sql: |
SELECT
dim2,
count(*) OVER (partition by dim2 ORDER BY dim1 ROWS UNBOUNDED PRECEDING),
count(*) OVER (partition by dim2 ORDER BY dim1 ROWS 1 PRECEDING),
count(*) OVER (partition by dim2 ORDER BY dim1 ROWS CURRENT ROW),
count(*) OVER (partition by dim2 ORDER BY dim1 ROWS 1 FOLLOWING),
count(*) OVER (partition by dim2 ORDER BY dim1 ROWS UNBOUNDED FOLLOWING)
FROM numfoo
WHERE dim2 IN ('a', 'abc')
GROUP BY dim2, dim1
expectedOperators:
- {"type":"naiveSort","columns":[{"column":"_d1","direction":"ASC"},{"column":"_d0","direction":"ASC"}]}
- { type: "naivePartition", partitionColumns: [ "_d1" ] }
- type: "window"
processor:
type: "framedAgg"
frame:
peerType: "ROWS"
lowUnbounded: true
lowOffset: 0
uppUnbounded: false
uppOffset: 0
orderBy: null
aggregations:
- { type: "count", name: "w0" }
- type: "window"
processor:
type: "framedAgg"
frame:
peerType: "ROWS"
lowUnbounded: false
lowOffset: -1
uppUnbounded: false
uppOffset: 0
orderBy: null
aggregations:
- { type: "count", name: "w1" }
- type: "window"
processor:
type: "framedAgg"
frame:
peerType: "ROWS"
lowUnbounded: false
lowOffset: 0
uppUnbounded: false
uppOffset: 0
orderBy: null
aggregations:
- { type: "count", name: "w2" }
- type: "window"
processor:
type: "framedAgg"
frame:
peerType: "ROWS"
lowUnbounded: false
lowOffset: 0
uppUnbounded: false
uppOffset: 1
orderBy: null
aggregations:
- { type: "count", name: "w3" }
- type: "window"
processor:
type: "framedAgg"
frame:
peerType: "ROWS"
lowUnbounded: false
lowOffset: 0
uppUnbounded: true
uppOffset: 0
orderBy: null
aggregations:
- { type: "count", name: "w4" }
expectedResults:
- ["a",1,1,1,2,2]
- ["a",2,2,1,1,1]
- ["abc",1,1,1,1,1]

View File

@ -9,16 +9,13 @@ sql: |
COUNT(1) OVER (ORDER BY FLOOR(m1/3) RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),
'postfix',
COUNT(1) OVER (ORDER BY FLOOR(m1/3) ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING),
COUNT(1) OVER (ORDER BY FLOOR(m1/3) RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING),
'k(1)',
COUNT(1) OVER (ORDER BY FLOOR(m1/3) ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),
COUNT(1) OVER (ORDER BY FLOOR(m1/3) RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING)
COUNT(1) OVER (ORDER BY FLOOR(m1/3) RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
FROM foo
expectedResults:
- [0.0,1,"prefix",1,2,"postfix",6,6,"k(1)",2,5]
- [0.0,1,"prefix",2,2,"postfix",5,6,"k(1)",3,5]
- [1.0,2,"prefix",3,5,"postfix",4,4,"k(1)",3,6]
- [1.0,2,"prefix",4,5,"postfix",3,4,"k(1)",3,6]
- [1.0,2,"prefix",5,5,"postfix",2,4,"k(1)",3,6]
- [2.0,3,"prefix",6,6,"postfix",1,1,"k(1)",2,4]
- [0.0,1,"prefix",1,2,"postfix",6,6]
- [0.0,1,"prefix",2,2,"postfix",5,6]
- [1.0,2,"prefix",3,5,"postfix",4,4]
- [1.0,2,"prefix",4,5,"postfix",3,4]
- [1.0,2,"prefix",5,5,"postfix",2,4]
- [2.0,3,"prefix",6,6,"postfix",1,1]

View File

@ -5,7 +5,7 @@ sql: |
countryIsoCode,
CAST (FLOOR(__time TO HOUR) AS BIGINT) t,
SUM(delta) delta,
SUM(SUM(delta)) OVER (PARTITION BY countryIsoCode ORDER BY CAST (FLOOR(__time TO HOUR) AS BIGINT) RANGE BETWEEN 3 PRECEDING AND 2 FOLLOWING) windowedDelta,
SUM(SUM(delta)) OVER (PARTITION BY countryIsoCode ORDER BY CAST (FLOOR(__time TO HOUR) AS BIGINT) ROWS BETWEEN 3 PRECEDING AND 2 FOLLOWING) windowedDelta,
ROW_NUMBER() OVER (PARTITION BY CAST (FLOOR(__time TO HOUR) AS BIGINT) ORDER BY SUM(delta)) AS hourlyRank
FROM wikipedia
GROUP BY 1, 2
@ -16,14 +16,12 @@ expectedOperators:
processor:
type: "framedAgg"
frame:
peerType: "RANGE"
peerType: "ROWS"
lowUnbounded: false
lowOffset: -3
uppUnbounded: false
uppOffset: 2
orderBy:
- column: d1
direction: ASC
orderBy: null
aggregations:
- { type: "longSum", name: "w0", fieldName: "a0" }
- { type: "naiveSort", columns: [ { column: "d1", direction: "ASC" }, { column: "a0", direction: "ASC"} ]}

View File

@ -5,7 +5,7 @@ sql: |
countryIsoCode,
CAST (FLOOR(__time TO HOUR) AS BIGINT) t,
SUM(delta) delta,
SUM(SUM(delta)) OVER (PARTITION BY countryIsoCode ORDER BY CAST (FLOOR(__time TO HOUR) AS BIGINT) DESC RANGE BETWEEN 3 PRECEDING AND 2 FOLLOWING) windowedDelta,
SUM(SUM(delta)) OVER (PARTITION BY countryIsoCode ORDER BY CAST (FLOOR(__time TO HOUR) AS BIGINT) DESC ROWS BETWEEN 3 PRECEDING AND 2 FOLLOWING) windowedDelta,
ROW_NUMBER() OVER (PARTITION BY CAST (FLOOR(__time TO HOUR) AS BIGINT) ORDER BY SUM(delta) DESC) AS hourlyRank
FROM wikipedia
GROUP BY 1, 2

View File

@ -5,7 +5,7 @@ sql: |
countryIsoCode,
CAST (FLOOR(__time TO HOUR) AS BIGINT) t,
SUM(delta) delta,
SUM(SUM(delta)) OVER (PARTITION BY countryIsoCode ORDER BY CAST (FLOOR(__time TO HOUR) AS BIGINT) RANGE BETWEEN 3 PRECEDING AND 2 FOLLOWING) windowedDelta
SUM(SUM(delta)) OVER (PARTITION BY countryIsoCode ORDER BY CAST (FLOOR(__time TO HOUR) AS BIGINT) ROWS BETWEEN 3 PRECEDING AND 2 FOLLOWING) windowedDelta
FROM wikipedia
GROUP BY 1, 2
@ -15,12 +15,12 @@ expectedOperators:
processor:
type: "framedAgg"
frame:
peerType: "RANGE"
peerType: "ROWS"
lowUnbounded: false
lowOffset: -3
uppUnbounded: false
uppOffset: 2
orderBy: [ {column: "d1", direction: ASC} ]
orderBy: null
aggregations:
- { type: "longSum", name: "w0", fieldName: "a0" }