groupBy: GroupByRowProcessor fixes, invert subquery context overrides. (#3502)

- Fix GroupByRowProcessor config overrides
- Fix GroupByRowProcessor resource limit checking
- Invert subquery context overrides such that for the subquery, its own
  keys override keys from the outer query, not the other way around.

The last bit is necessary for the test to work, and seems like a better
way to do it anyway.
This commit is contained in:
Gian Merlino 2016-09-23 14:41:09 -07:00 committed by Fangjin Yang
parent 7195be32d8
commit d5a8a35fec
3 changed files with 70 additions and 4 deletions

View File

@ -138,9 +138,22 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
final DataSource dataSource = query.getDataSource(); final DataSource dataSource = query.getDataSource();
if (dataSource instanceof QueryDataSource) { if (dataSource instanceof QueryDataSource) {
GroupByQuery subquery; final GroupByQuery subquery;
try { try {
subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery().withOverriddenContext(query.getContext()); // Inject outer query context keys into subquery if they don't already exist in the subquery context.
// Unlike withOverriddenContext's normal behavior, we want keys present in the subquery to win.
final Map<String, Object> subqueryContext = Maps.newTreeMap();
if (query.getContext() != null) {
for (Map.Entry<String, Object> entry : query.getContext().entrySet()) {
if (entry.getValue() != null) {
subqueryContext.put(entry.getKey(), entry.getValue());
}
}
}
if (((QueryDataSource) dataSource).getQuery().getContext() != null) {
subqueryContext.putAll(((QueryDataSource) dataSource).getQuery().getContext());
}
subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery().withOverriddenContext(subqueryContext);
} }
catch (ClassCastException e) { catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries must be of type 'group by'"); throw new UnsupportedOperationException("Subqueries must be of type 'group by'");

View File

@ -35,6 +35,7 @@ import io.druid.data.input.Row;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryContextKeys; import io.druid.query.QueryContextKeys;
import io.druid.query.QueryInterruptedException; import io.druid.query.QueryInterruptedException;
import io.druid.query.ResourceLimitExceededException;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.filter.DruidLongPredicate; import io.druid.query.filter.DruidLongPredicate;
import io.druid.query.filter.DruidPredicateFactory; import io.druid.query.filter.DruidPredicateFactory;
@ -147,7 +148,7 @@ public class GroupByRowProcessor
Pair<Grouper<RowBasedKey>, Accumulator<Grouper<RowBasedKey>, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( Pair<Grouper<RowBasedKey>, Accumulator<Grouper<RowBasedKey>, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(
query, query,
true, true,
config, querySpecificConfig,
mergeBufferHolder.get(), mergeBufferHolder.get(),
-1, -1,
temporaryStorage, temporaryStorage,
@ -158,7 +159,10 @@ public class GroupByRowProcessor
final Accumulator<Grouper<RowBasedKey>, Row> accumulator = pair.rhs; final Accumulator<Grouper<RowBasedKey>, Row> accumulator = pair.rhs;
closeOnFailure.add(grouper); closeOnFailure.add(grouper);
filteredSequence.accumulate(grouper, accumulator); final Grouper<RowBasedKey> retVal = filteredSequence.accumulate(grouper, accumulator);
if (retVal != grouper) {
throw new ResourceLimitExceededException("Grouping resources exhausted");
}
return RowBasedGrouperHelper.makeGrouperIterator( return RowBasedGrouperHelper.makeGrouperIterator(
grouper, grouper,

View File

@ -1073,6 +1073,55 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, results, ""); TestHelper.assertExpectedObjects(expectedResults, results, "");
} }
@Test
public void testSubqueryWithOuterMaxOnDiskStorageContextOverride()
{
final GroupByQuery subquery = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setGranularity(QueryRunnerTestHelper.dayGran)
.setLimitSpec(
new DefaultLimitSpec(
ImmutableList.of(new OrderByColumnSpec("alias", OrderByColumnSpec.Direction.ASCENDING)),
null
)
)
.setContext(
ImmutableMap.<String, Object>of(
"maxOnDiskStorage", Integer.MAX_VALUE,
"bufferGrouperMaxSize", Integer.MAX_VALUE
)
)
.build();
final GroupByQuery query = GroupByQuery
.builder()
.setDataSource(subquery)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList())
.setAggregatorSpecs(ImmutableList.<AggregatorFactory>of(new CountAggregatorFactory("count")))
.setGranularity(QueryRunnerTestHelper.allGran)
.setContext(ImmutableMap.<String, Object>of("maxOnDiskStorage", 0, "bufferGrouperMaxSize", 0))
.build();
// v1 strategy throws an exception for this query because it tries to merge the noop outer
// and default inner limit specs, then apply the resulting spec to the outer query, which
// fails because the inner limit spec refers to columns that don't exist in the outer
// query. I'm not sure why it does this.
if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) {
expectedException.expect(ISE.class);
expectedException.expectMessage("Unknown column in order clause");
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
} else {
expectedException.expect(ResourceLimitExceededException.class);
expectedException.expectMessage("Grouping resources exhausted");
GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
}
}
@Test @Test
public void testGroupByWithRebucketRename() public void testGroupByWithRebucketRename()
{ {