mirror of https://github.com/apache/druid.git
Return empty result when a group by gets optimized to a timeseries query (#12065)
Related to #11188 The above mentioned PR allowed timeseries queries to return a default result, when queries of type: select count(*) from table where dim1="_not_present_dim_" were executed. Before the PR, it returned no row, after the PR, it would return a row with value of count(*) as 0 (as expected by SQL standards of different dbs). In Grouping#applyProject, we can sometimes perform optimization of a groupBy query to a timeseries query if possible (when the keys of the groupBy are constants, as generated by automated tools). For example, in select count(*) from table where dim1="_present_dim_" group by "dummy_key", the groupBy clause can be removed. However, in the case when the filter doesn't return anything, i.e. select count(*) from table where dim1="_not_present_dim_" group by "dummy_key", the behavior of general databases would be to return nothing, while druid (due to above change) returns an empty row. This PR aims to fix this divergence of behavior. Example cases: select count(*) from table where dim1="_not_present_dim_" group by "dummy_key". CURRENT: Returns a row with count(*) = 0 EXPECTED: Return no row select 'A', dim1 from foo where m1 = 123123 and dim1 = '_not_present_again_' group by dim1 CURRENT: Returns a row with ('A', 'wat') EXPECTED: Return no row To do this, a boolean droppedDimensionsWhileApplyingProject has been added to Grouping which is true whenever we make changes to the original shape with optimization. Hence if a timeseries query has a grouping with this set to true, we set skipEmptyBuckets=true in the query context (i.e. donot return any row).
This commit is contained in:
parent
2299eb321e
commit
7c17341caa
|
@ -856,7 +856,9 @@ public class DruidQuery
|
|||
// An aggregation query should return one row per group, with no grouping (e.g. ALL granularity), the entire table
|
||||
// is the group, so we should not skip empty buckets. When there are no results, this means we return the
|
||||
// initialized state for given aggregators instead of nothing.
|
||||
if (!Granularities.ALL.equals(queryGranularity)) {
|
||||
// Alternatively, the timeseries query should return empty buckets, even with ALL granularity when timeseries query
|
||||
// was originally a groupBy query, but with the grouping dimensions removed away in Grouping#applyProject
|
||||
if (!Granularities.ALL.equals(queryGranularity) || grouping.hasGroupingDimensionsDropped()) {
|
||||
theContext.put(TimeseriesQuery.SKIP_EMPTY_BUCKETS, true);
|
||||
}
|
||||
theContext.putAll(plannerContext.getQueryContext());
|
||||
|
|
|
@ -63,6 +63,11 @@ public class Grouping
|
|||
private final DimFilter havingFilter;
|
||||
private final RowSignature outputRowSignature;
|
||||
|
||||
// Denotes whether the original Grouping had more dimensions which were dropped while applying projection to optimize
|
||||
// the grouping. Used for returning result which is consistent with most SQL implementations, by correspondingly
|
||||
// setting/unsetting the SKIP_EMPTY_BUCKETS flag, if the GroupBy query can be reduced to a timeseries query.
|
||||
private final boolean groupingDimensionsDropped;
|
||||
|
||||
private Grouping(
|
||||
final List<DimensionExpression> dimensions,
|
||||
final Subtotals subtotals,
|
||||
|
@ -70,12 +75,25 @@ public class Grouping
|
|||
@Nullable final DimFilter havingFilter,
|
||||
final RowSignature outputRowSignature
|
||||
)
|
||||
{
|
||||
this(dimensions, subtotals, aggregations, havingFilter, outputRowSignature, false);
|
||||
}
|
||||
|
||||
private Grouping(
|
||||
final List<DimensionExpression> dimensions,
|
||||
final Subtotals subtotals,
|
||||
final List<Aggregation> aggregations,
|
||||
@Nullable final DimFilter havingFilter,
|
||||
final RowSignature outputRowSignature,
|
||||
final boolean groupingDimensionsDropped
|
||||
)
|
||||
{
|
||||
this.dimensions = ImmutableList.copyOf(dimensions);
|
||||
this.subtotals = Preconditions.checkNotNull(subtotals, "subtotals");
|
||||
this.aggregations = ImmutableList.copyOf(aggregations);
|
||||
this.havingFilter = havingFilter;
|
||||
this.outputRowSignature = Preconditions.checkNotNull(outputRowSignature, "outputRowSignature");
|
||||
this.groupingDimensionsDropped = groupingDimensionsDropped;
|
||||
|
||||
// Verify no collisions between dimensions, aggregations, post-aggregations.
|
||||
final Set<String> seen = new HashSet<>();
|
||||
|
@ -103,6 +121,27 @@ public class Grouping
|
|||
}
|
||||
}
|
||||
|
||||
// This method is private since groupingDimensionsDropped should only be deviated from default in
|
||||
// applyProject
|
||||
private static Grouping create(
|
||||
final List<DimensionExpression> dimensions,
|
||||
final Subtotals subtotals,
|
||||
final List<Aggregation> aggregations,
|
||||
@Nullable final DimFilter havingFilter,
|
||||
final RowSignature outputRowSignature,
|
||||
final boolean groupingDimensionsDropped
|
||||
)
|
||||
{
|
||||
return new Grouping(
|
||||
dimensions,
|
||||
subtotals,
|
||||
aggregations,
|
||||
havingFilter,
|
||||
outputRowSignature,
|
||||
groupingDimensionsDropped
|
||||
);
|
||||
}
|
||||
|
||||
public static Grouping create(
|
||||
final List<DimensionExpression> dimensions,
|
||||
final Subtotals subtotals,
|
||||
|
@ -160,6 +199,11 @@ public class Grouping
|
|||
return outputRowSignature;
|
||||
}
|
||||
|
||||
public boolean hasGroupingDimensionsDropped()
|
||||
{
|
||||
return groupingDimensionsDropped;
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies a post-grouping projection.
|
||||
*
|
||||
|
@ -187,11 +231,13 @@ public class Grouping
|
|||
// actually want to include a dimension 'dummy'.
|
||||
final ImmutableBitSet aggregateProjectBits = RelOptUtil.InputFinder.bits(project.getChildExps(), null);
|
||||
final int[] newDimIndexes = new int[dimensions.size()];
|
||||
boolean droppedDimensions = false;
|
||||
|
||||
for (int i = 0; i < dimensions.size(); i++) {
|
||||
final DimensionExpression dimension = dimensions.get(i);
|
||||
if (Parser.parse(dimension.getDruidExpression().getExpression(), plannerContext.getExprMacroTable())
|
||||
.isLiteral() && !aggregateProjectBits.get(i)) {
|
||||
droppedDimensions = true;
|
||||
newDimIndexes[i] = -1;
|
||||
} else {
|
||||
newDimIndexes[i] = newDimensions.size();
|
||||
|
@ -225,7 +271,8 @@ public class Grouping
|
|||
newSubtotals,
|
||||
newAggregations,
|
||||
havingFilter,
|
||||
postAggregationProjection.getOutputRowSignature()
|
||||
postAggregationProjection.getOutputRowSignature(),
|
||||
droppedDimensions
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -243,12 +290,20 @@ public class Grouping
|
|||
subtotals.equals(grouping.subtotals) &&
|
||||
aggregations.equals(grouping.aggregations) &&
|
||||
Objects.equals(havingFilter, grouping.havingFilter) &&
|
||||
outputRowSignature.equals(grouping.outputRowSignature);
|
||||
outputRowSignature.equals(grouping.outputRowSignature) &&
|
||||
groupingDimensionsDropped == grouping.groupingDimensionsDropped;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(dimensions, subtotals, aggregations, havingFilter, outputRowSignature);
|
||||
return Objects.hash(
|
||||
dimensions,
|
||||
subtotals,
|
||||
aggregations,
|
||||
havingFilter,
|
||||
outputRowSignature,
|
||||
groupingDimensionsDropped
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3911,12 +3911,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
new CountAggregatorFactory("a0"),
|
||||
new LongMaxAggregatorFactory("a1", "cnt")
|
||||
))
|
||||
.context(QUERY_CONTEXT_DEFAULT)
|
||||
.context(QUERY_CONTEXT_DO_SKIP_EMPTY_BUCKETS)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{0L, NullHandling.sqlCompatible() ? null : Long.MIN_VALUE}
|
||||
)
|
||||
ImmutableList.of()
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -13375,4 +13373,111 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
ImmutableList.of()
|
||||
);
|
||||
}
|
||||
|
||||
// When optimization in Grouping#applyProject is applied, and it reduces a Group By query to a timeseries, we
|
||||
// want it to return empty bucket if no row matches
|
||||
@Test
|
||||
public void testReturnEmptyRowWhenGroupByIsConvertedToTimeseriesWithSingleConstantDimension() throws Exception
|
||||
{
|
||||
skipVectorize();
|
||||
testQuery(
|
||||
"SELECT 'A' from foo WHERE m1 = 50 AND dim1 = 'wat' GROUP BY 'foobar'",
|
||||
ImmutableList.of(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE1)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.filters(
|
||||
and(
|
||||
selector("m1", "50", null),
|
||||
selector("dim1", "wat", null)
|
||||
)
|
||||
)
|
||||
.granularity(Granularities.ALL)
|
||||
.postAggregators(
|
||||
new ExpressionPostAggregator("p0", "'A'", null, ExprMacroTable.nil())
|
||||
)
|
||||
.context(QUERY_CONTEXT_DO_SKIP_EMPTY_BUCKETS)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of()
|
||||
);
|
||||
|
||||
|
||||
// dim1 is not getting reduced to 'wat' in this case in Calcite (ProjectMergeRule is not getting applied),
|
||||
// therefore the query is not optimized to a timeseries query
|
||||
testQuery(
|
||||
"SELECT 'A' from foo WHERE dim1 = 'wat' GROUP BY dim1",
|
||||
ImmutableList.of(
|
||||
GroupByQuery.builder()
|
||||
.setDataSource(
|
||||
"foo"
|
||||
)
|
||||
.setInterval(querySegmentSpec(Intervals.ETERNITY))
|
||||
.setGranularity(Granularities.ALL)
|
||||
.addDimension(new DefaultDimensionSpec("dim1", "d0", ColumnType.STRING))
|
||||
.setDimFilter(selector("dim1", "wat", null))
|
||||
.setPostAggregatorSpecs(
|
||||
ImmutableList.of(
|
||||
new ExpressionPostAggregator("p0", "'A'", null, ExprMacroTable.nil())
|
||||
)
|
||||
)
|
||||
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReturnEmptyRowWhenGroupByIsConvertedToTimeseriesWithMutlipleConstantDimensions() throws Exception
|
||||
{
|
||||
skipVectorize();
|
||||
testQuery(
|
||||
"SELECT 'A', dim1 from foo WHERE m1 = 50 AND dim1 = 'wat' GROUP BY dim1",
|
||||
ImmutableList.of(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE1)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.filters(
|
||||
and(
|
||||
selector("m1", "50", null),
|
||||
selector("dim1", "wat", null)
|
||||
)
|
||||
)
|
||||
.granularity(Granularities.ALL)
|
||||
.postAggregators(
|
||||
new ExpressionPostAggregator("p0", "'A'", null, ExprMacroTable.nil()),
|
||||
new ExpressionPostAggregator("p1", "'wat'", null, ExprMacroTable.nil())
|
||||
)
|
||||
.context(QUERY_CONTEXT_DO_SKIP_EMPTY_BUCKETS)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of()
|
||||
);
|
||||
|
||||
// Sanity test, that even when dimensions are reduced, but should produce a valid result (i.e. when filters are
|
||||
// correct, then they should
|
||||
testQuery(
|
||||
"SELECT 'A', dim1 from foo WHERE m1 = 2.0 AND dim1 = '10.1' GROUP BY dim1",
|
||||
ImmutableList.of(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE1)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.filters(
|
||||
and(
|
||||
selector("m1", "2.0", null),
|
||||
selector("dim1", "10.1", null)
|
||||
)
|
||||
)
|
||||
.granularity(Granularities.ALL)
|
||||
.postAggregators(
|
||||
new ExpressionPostAggregator("p0", "'A'", null, ExprMacroTable.nil()),
|
||||
new ExpressionPostAggregator("p1", "'10.1'", null, ExprMacroTable.nil())
|
||||
)
|
||||
.context(QUERY_CONTEXT_DO_SKIP_EMPTY_BUCKETS)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(new Object[]{"A", "10.1"})
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue