diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index ad667cb4a06..d9996e72785 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -138,9 +138,22 @@ public class GroupByQueryQueryToolChest extends QueryToolChest subqueryContext = Maps.newTreeMap(); + if (query.getContext() != null) { + for (Map.Entry entry : query.getContext().entrySet()) { + if (entry.getValue() != null) { + subqueryContext.put(entry.getKey(), entry.getValue()); + } + } + } + if (((QueryDataSource) dataSource).getQuery().getContext() != null) { + subqueryContext.putAll(((QueryDataSource) dataSource).getQuery().getContext()); + } + subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery().withOverriddenContext(subqueryContext); } catch (ClassCastException e) { throw new UnsupportedOperationException("Subqueries must be of type 'group by'"); diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index e5af021dfd8..df9d334615b 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -35,6 +35,7 @@ import io.druid.data.input.Row; import io.druid.query.Query; import io.druid.query.QueryContextKeys; import io.druid.query.QueryInterruptedException; +import io.druid.query.ResourceLimitExceededException; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.filter.DruidLongPredicate; import io.druid.query.filter.DruidPredicateFactory; @@ -147,7 +148,7 @@ public class GroupByRowProcessor Pair, Accumulator, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair( query, true, - config, + querySpecificConfig, mergeBufferHolder.get(), -1, temporaryStorage, @@ -158,7 +159,10 @@ public class GroupByRowProcessor final Accumulator, Row> accumulator = pair.rhs; closeOnFailure.add(grouper); - filteredSequence.accumulate(grouper, accumulator); + final Grouper retVal = filteredSequence.accumulate(grouper, accumulator); + if (retVal != grouper) { + throw new ResourceLimitExceededException("Grouping resources exhausted"); + } return RowBasedGrouperHelper.makeGrouperIterator( grouper, diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 662de75269d..b2f416a4742 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -1073,6 +1073,55 @@ public class GroupByQueryRunnerTest TestHelper.assertExpectedObjects(expectedResults, results, ""); } + @Test + public void testSubqueryWithOuterMaxOnDiskStorageContextOverride() + { + final GroupByQuery subquery = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.fullOnInterval) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setGranularity(QueryRunnerTestHelper.dayGran) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of(new OrderByColumnSpec("alias", OrderByColumnSpec.Direction.ASCENDING)), + null + ) + ) + .setContext( + ImmutableMap.of( + "maxOnDiskStorage", Integer.MAX_VALUE, + "bufferGrouperMaxSize", Integer.MAX_VALUE + ) + ) + .build(); + + final GroupByQuery query = GroupByQuery + .builder() + .setDataSource(subquery) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList()) + .setAggregatorSpecs(ImmutableList.of(new CountAggregatorFactory("count"))) + .setGranularity(QueryRunnerTestHelper.allGran) + .setContext(ImmutableMap.of("maxOnDiskStorage", 0, "bufferGrouperMaxSize", 0)) + .build(); + + // v1 strategy throws an exception for this query because it tries to merge the noop outer + // and default inner limit specs, then apply the resulting spec to the outer query, which + // fails because the inner limit spec refers to columns that don't exist in the outer + // query. I'm not sure why it does this. + + if (config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)) { + expectedException.expect(ISE.class); + expectedException.expectMessage("Unknown column in order clause"); + GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + } else { + expectedException.expect(ResourceLimitExceededException.class); + expectedException.expectMessage("Grouping resources exhausted"); + GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + } + } + @Test public void testGroupByWithRebucketRename() {