Allow forceLimitPushDown in SQL (#10253)

* Allow forceLimitPushDown in SQL

* fix test

* fix test

* review comments

* fix test
This commit is contained in:
Jihoon Son 2020-08-13 13:30:41 -07:00 committed by GitHub
parent 748a83cb78
commit a61263b4a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 65 additions and 14 deletions

View File

@ -110,7 +110,6 @@ public class GroupByQuery extends BaseQuery<ResultRow>
@Nullable
private final List<List<String>> subtotalsSpec;
private final boolean applyLimitPushDown;
private final Function<Sequence<ResultRow>, Sequence<ResultRow>> postProcessingFn;
private final RowSignature resultRowSignature;
@ -121,6 +120,15 @@ public class GroupByQuery extends BaseQuery<ResultRow>
@Nullable
private final DateTime universalTimestamp;
private final boolean canDoLimitPushDown;
/**
* A flag to force limit pushdown to historicals.
* Lazily initialized when calling {@link #validateAndGetForceLimitPushDown()}.
*/
@Nullable
private Boolean forceLimitPushDown;
@JsonCreator
public GroupByQuery(
@JsonProperty("dataSource") DataSource dataSource,
@ -218,7 +226,11 @@ public class GroupByQuery extends BaseQuery<ResultRow>
this.postProcessingFn = postProcessingFn != null ? postProcessingFn : makePostProcessingFn();
// Check if limit push down configuration is valid and check if limit push down will be applied
this.applyLimitPushDown = determineApplyLimitPushDown();
this.canDoLimitPushDown = canDoLimitPushDown(
this.limitSpec,
this.havingSpec,
this.subtotalsSpec
);
}
@Nullable
@ -409,7 +421,10 @@ public class GroupByQuery extends BaseQuery<ResultRow>
@JsonIgnore
public boolean isApplyLimitPushDown()
{
return applyLimitPushDown;
if (forceLimitPushDown == null) {
forceLimitPushDown = validateAndGetForceLimitPushDown();
}
return forceLimitPushDown || canDoLimitPushDown;
}
@JsonIgnore
@ -474,14 +489,16 @@ public class GroupByQuery extends BaseQuery<ResultRow>
.build();
}
private boolean determineApplyLimitPushDown()
private boolean canDoLimitPushDown(
@Nullable LimitSpec limitSpec,
@Nullable HavingSpec havingSpec,
@Nullable List<List<String>> subtotalsSpec
)
{
if (subtotalsSpec != null) {
if (subtotalsSpec != null && !subtotalsSpec.isEmpty()) {
return false;
}
final boolean forceLimitPushDown = validateAndGetForceLimitPushDown();
if (limitSpec instanceof DefaultLimitSpec) {
DefaultLimitSpec limitSpecWithoutOffset = ((DefaultLimitSpec) limitSpec).withOffsetToLimit();
@ -490,10 +507,6 @@ public class GroupByQuery extends BaseQuery<ResultRow>
return false;
}
if (forceLimitPushDown) {
return true;
}
if (!getApplyLimitPushDownFromContext()) {
return false;
}
@ -612,7 +625,7 @@ public class GroupByQuery extends BaseQuery<ResultRow>
public Ordering<ResultRow> getRowOrdering(final boolean granular)
{
if (applyLimitPushDown) {
if (isApplyLimitPushDown()) {
if (!DefaultLimitSpec.sortingOrderHasNonGroupingFields((DefaultLimitSpec) limitSpec, dimensions)) {
return getRowOrderingForPushDown(granular, (DefaultLimitSpec) limitSpec);
}

View File

@ -10417,6 +10417,9 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
@Test
public void testGroupByLimitPushDownPostAggNotSupported()
{
if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
return;
}
expectedException.expect(UnsupportedOperationException.class);
expectedException.expectMessage("Limit push down when sorting by a post aggregator is not supported.");
@ -10560,7 +10563,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
expectedException.expect(IAE.class);
expectedException.expectMessage("Cannot force limit push down when a having spec is present.");
makeQueryBuilder()
final GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
.setDimensions(new DefaultDimensionSpec(QueryRunnerTestHelper.MARKET_DIMENSION, "marketalias"))
@ -10575,6 +10578,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
.overrideContext(ImmutableMap.of(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, true))
.setHavingSpec(new GreaterThanHavingSpec("rows", 10))
.build();
query.isApplyLimitPushDown();
}
@Test

View File

@ -134,7 +134,8 @@ public class GroupByQueryTest
.suppress(Warning.NULL_FIELDS, Warning.NONFINAL_FIELDS)
// Fields derived from other fields are not included in equals/hashCode
.withIgnoredFields(
"applyLimitPushDown",
"canDoLimitPushDown",
"forceLimitPushDown",
"postProcessingFn",
"resultRowSignature",
"universalTimestamp"

View File

@ -88,6 +88,7 @@ import org.apache.druid.query.filter.NotDimFilter;
import org.apache.druid.query.filter.RegexDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
@ -1540,6 +1541,38 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
@Test
public void testGroupByWithForceLimitPushDown() throws Exception
{
final Map<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
context.put(GroupByQueryConfig.CTX_KEY_FORCE_LIMIT_PUSH_DOWN, true);
testQuery(
"SELECT dim1, dim2, COUNT(*) FROM druid.foo GROUP BY dim1, dim2 limit 1",
context,
ImmutableList.of(
new GroupByQuery.Builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(querySegmentSpec(Filtration.eternity()))
.setGranularity(Granularities.ALL)
.setDimensions(
new DefaultDimensionSpec("dim1", "d0", ValueType.STRING),
new DefaultDimensionSpec("dim2", "d1", ValueType.STRING)
)
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(),
1
)
)
.setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
.setContext(context)
.build()
),
ImmutableList.of(new Object[]{"", "a", 1L})
);
}
@Test
public void testGroupByLimitWrappingOrderByAgg() throws Exception
{