Add validation for aggregations on __time (#13793)

* Add validation for aggregations on __time
This commit is contained in:
Adarsh Sanjeev 2023-03-08 06:46:36 +05:30 committed by GitHub
parent faac43eabe
commit ef82756176
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 62 additions and 6 deletions

View File

@ -86,7 +86,7 @@ In the aggregation functions supported by Druid, only `COUNT`, `ARRAY_AGG`, and
|`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit are truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
|`EARLIEST_BY(expr, timestampExpr)`|Returns the earliest value of `expr`, which must be numeric. The earliest value of `expr` is taken from the row with the overall earliest non-null value of `timestampExpr`. If the earliest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`EARLIEST_BY(expr, timestampExpr, maxBytesPerString)`| Like `EARLIEST_BY(expr, timestampExpr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit are truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like `__time` in a Druid datasource), the "latest" is taken from the row with the overall latest non-null value of the timestamp column. If the latest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. The `expr` must come from a relation with a timestamp column (like `__time` in a Druid datasource) and the "latest" is taken from the row with the overall latest non-null value of the timestamp column. If the latest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows. |`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`LATEST(expr, maxBytesPerString)`|Like `LATEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit are truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
|`LATEST_BY(expr, timestampExpr)`|Returns the latest value of `expr`, which must be numeric. The latest value of `expr` is taken from the row with the overall latest non-null value of `timestampExpr`. If the overall latest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`LATEST_BY(expr, timestampExpr, maxBytesPerString)`|Like `LATEST_BY(expr, timestampExpr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit are truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|

View File

@ -1363,6 +1363,39 @@ public class MSQSelectTest extends MSQTestBase
.verifyResults();
}
@Test
public void testTimeColumnAggregationFromExtern() throws IOException
{
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json");
final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("cnt", ColumnType.LONG)
.build();
testSelectQuery()
.setSql("WITH\n"
+ "kttm_data AS (\n"
+ "SELECT * FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [" + toReadAsJson + "],\"type\":\"local\"}',\n"
+ " '{\"type\":\"json\"}',\n"
+ " '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n"
+ " )\n"
+ "))\n"
+ "\n"
+ "SELECT\n"
+ " FLOOR(TIME_PARSE(\"timestamp\") TO MINUTE) AS __time,\n"
+ " LATEST(\"page\") AS \"page\"\n"
+ "FROM kttm_data "
+ "GROUP BY 1")
.setExpectedValidationErrorMatcher(
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("LATEST() aggregator depends on __time column"))
)
.setExpectedRowSignature(rowSignature)
.verifyPlanningErrors();
}
@Test
public void testGroupByWithMultiValueMvToArrayWithoutGroupByEnable()

View File

@ -51,6 +51,7 @@ import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory;
import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
@ -196,6 +197,16 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
final String fieldName = getColumnName(plannerContext, virtualColumnRegistry, args.get(0), rexNodes.get(0));
if (!rowSignature.contains(ColumnHolder.TIME_COLUMN_NAME) && (aggregatorType == AggregatorType.LATEST || aggregatorType == AggregatorType.EARLIEST)) {
plannerContext.setPlanningError("%s() aggregator depends on __time column, the underlying datasource "
+ "or extern function you are querying doesn't contain __time column, "
+ "Please use %s_BY() and specify the time column you want to use",
aggregatorType.name(),
aggregatorType.name()
);
return null;
}
final AggregatorFactory theAggFactory;
switch (args.size()) {
case 1:

View File

@ -54,7 +54,6 @@ import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.any.StringAnyAggregatorFactory;
import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
@ -86,6 +85,7 @@ import org.apache.druid.server.security.Access;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.UnsupportedSQLQueryException;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.joda.time.DateTimeZone;
import org.joda.time.Period;
@ -1412,7 +1412,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
"SELECT dim1, dim2, t1.v, t1.v\n"
+ "FROM foo\n"
+ "INNER JOIN \n"
+ " (SELECT SUBSTRING(k, 1, 1) k, LATEST(v, 10) v FROM lookup.lookyloo GROUP BY 1) t1\n"
+ " (SELECT SUBSTRING(k, 1, 1) k, ANY_VALUE(v, 10) v FROM lookup.lookyloo GROUP BY 1) t1\n"
+ " ON foo.dim2 = t1.k",
queryContext,
ImmutableList.of(
@ -1433,7 +1433,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
new SubstringDimExtractionFn(0, 1)
)
)
.setAggregatorSpecs(new StringLastAggregatorFactory("a0", "v", null, 10))
.setAggregatorSpecs(new StringAnyAggregatorFactory("a0", "v", 10))
.build()
),
"j0.",
@ -1447,12 +1447,24 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
.build()
),
ImmutableList.of(
new Object[]{"", "a", "xabc", "xabc"},
new Object[]{"1", "a", "xabc", "xabc"}
new Object[]{"", "a", "xa", "xa"},
new Object[]{"1", "a", "xa", "xa"}
)
);
}
@Test(expected = UnsupportedSQLQueryException.class)
@Parameters(source = QueryContextForJoinProvider.class)
public void testTimeColumnAggregationsOnLookups(Map<String, Object> queryContext)
{
testQuery(
"SELECT k, LATEST(v) v FROM lookup.lookyloo GROUP BY k",
queryContext,
ImmutableList.of(),
ImmutableList.of()
);
}
@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testInnerJoinQueryOfLookupRemovable(Map<String, Object> queryContext)