diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java index decf8fb4d80..091dada6a93 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java @@ -110,7 +110,6 @@ public class GroupByQuery extends BaseQuery @Nullable private final List> subtotalsSpec; - private final boolean applyLimitPushDown; private final Function, Sequence> postProcessingFn; private final RowSignature resultRowSignature; @@ -121,6 +120,15 @@ public class GroupByQuery extends BaseQuery @Nullable private final DateTime universalTimestamp; + private final boolean canDoLimitPushDown; + + /** + * A flag to force limit pushdown to historicals. + * Lazily initialized when calling {@link #validateAndGetForceLimitPushDown()}. + */ + @Nullable + private Boolean forceLimitPushDown; + @JsonCreator public GroupByQuery( @JsonProperty("dataSource") DataSource dataSource, @@ -218,7 +226,11 @@ public class GroupByQuery extends BaseQuery this.postProcessingFn = postProcessingFn != null ? postProcessingFn : makePostProcessingFn(); // Check if limit push down configuration is valid and check if limit push down will be applied - this.applyLimitPushDown = determineApplyLimitPushDown(); + this.canDoLimitPushDown = canDoLimitPushDown( + this.limitSpec, + this.havingSpec, + this.subtotalsSpec + ); } @Nullable @@ -409,7 +421,10 @@ public class GroupByQuery extends BaseQuery @JsonIgnore public boolean isApplyLimitPushDown() { - return applyLimitPushDown; + if (forceLimitPushDown == null) { + forceLimitPushDown = validateAndGetForceLimitPushDown(); + } + return forceLimitPushDown || canDoLimitPushDown; } @JsonIgnore @@ -474,14 +489,16 @@ public class GroupByQuery extends BaseQuery .build(); } - private boolean determineApplyLimitPushDown() + private boolean canDoLimitPushDown( + @Nullable LimitSpec limitSpec, + @Nullable HavingSpec havingSpec, + @Nullable List> subtotalsSpec + ) { - if (subtotalsSpec != null) { + if (subtotalsSpec != null && !subtotalsSpec.isEmpty()) { return false; } - final boolean forceLimitPushDown = validateAndGetForceLimitPushDown(); - if (limitSpec instanceof DefaultLimitSpec) { DefaultLimitSpec limitSpecWithoutOffset = ((DefaultLimitSpec) limitSpec).withOffsetToLimit(); @@ -490,10 +507,6 @@ public class GroupByQuery extends BaseQuery return false; } - if (forceLimitPushDown) { - return true; - } - if (!getApplyLimitPushDownFromContext()) { return false; } @@ -612,7 +625,7 @@ public class GroupByQuery extends BaseQuery public Ordering getRowOrdering(final boolean granular) { - if (applyLimitPushDown) { + if (isApplyLimitPushDown()) { if (!DefaultLimitSpec.sortingOrderHasNonGroupingFields((DefaultLimitSpec) limitSpec, dimensions)) { return getRowOrderingForPushDown(granular, (DefaultLimitSpec) limitSpec); } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 8dfac83d5fd..c89e35b4413 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -10417,6 +10417,9 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest @Test public void testGroupByLimitPushDownPostAggNotSupported() { + if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) { + return; + } expectedException.expect(UnsupportedOperationException.class); expectedException.expectMessage("Limit push down when sorting by a post aggregator is not supported."); @@ -10560,7 +10563,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest expectedException.expect(IAE.class); expectedException.expectMessage("Cannot force limit push down when a having spec is present."); - makeQueryBuilder() + final GroupByQuery query = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setGranularity(QueryRunnerTestHelper.ALL_GRAN) .setDimensions(new DefaultDimensionSpec(QueryRunnerTestHelper.MARKET_DIMENSION, "marketalias")) @@ -10575,6 +10578,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest .overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, true)) .setHavingSpec(new GreaterThanHavingSpec("rows", 10)) .build(); + query.isApplyLimitPushDown(); } @Test diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryTest.java index bd6938ec0b8..b6f76b4be16 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryTest.java @@ -134,7 +134,8 @@ public class GroupByQueryTest .suppress(Warning.NULL_FIELDS, Warning.NONFINAL_FIELDS) // Fields derived from other fields are not included in equals/hashCode .withIgnoredFields( - "applyLimitPushDown", + "canDoLimitPushDown", + "forceLimitPushDown", "postProcessingFn", "resultRowSignature", "universalTimestamp" diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 9409f818693..e7b537edc97 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -88,6 +88,7 @@ import org.apache.druid.query.filter.NotDimFilter; import org.apache.druid.query.filter.RegexDimFilter; import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; @@ -1540,6 +1541,38 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } + @Test + public void testGroupByWithForceLimitPushDown() throws Exception + { + final Map context = new HashMap<>(QUERY_CONTEXT_DEFAULT); + context.put(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, true); + + testQuery( + "SELECT dim1, dim2, COUNT(*) FROM druid.foo GROUP BY dim1, dim2 limit 1", + context, + ImmutableList.of( + new GroupByQuery.Builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions( + new DefaultDimensionSpec("dim1", "d0", ValueType.STRING), + new DefaultDimensionSpec("dim2", "d1", ValueType.STRING) + ) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of(), + 1 + ) + ) + .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0"))) + .setContext(context) + .build() + ), + ImmutableList.of(new Object[]{"", "a", 1L}) + ); + } + @Test public void testGroupByLimitWrappingOrderByAgg() throws Exception {