mirror of https://github.com/apache/druid.git
Merge pull request #2562 from himanshug/fix_2556
with nested GpBy query outer query results need to be further merged
This commit is contained in:
commit
e5c25725c0
|
@ -199,35 +199,42 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
||||||
final GroupByQuery outerQuery = new GroupByQuery.Builder(query)
|
final GroupByQuery outerQuery = new GroupByQuery.Builder(query)
|
||||||
.setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec()))
|
.setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec()))
|
||||||
.build();
|
.build();
|
||||||
final IncrementalIndex index = makeIncrementalIndex(innerQuery, subqueryResult);
|
|
||||||
|
final IncrementalIndex innerQueryResultIndex = makeIncrementalIndex(innerQuery, subqueryResult);
|
||||||
|
|
||||||
//Outer query might have multiple intervals, but they are expected to be non-overlapping and sorted which
|
//Outer query might have multiple intervals, but they are expected to be non-overlapping and sorted which
|
||||||
//is ensured by QuerySegmentSpec.
|
//is ensured by QuerySegmentSpec.
|
||||||
//GroupByQueryEngine can only process one interval at a time, so we need to call it once per interval
|
//GroupByQueryEngine can only process one interval at a time, so we need to call it once per interval
|
||||||
//and concatenate the results.
|
//and concatenate the results.
|
||||||
return new ResourceClosingSequence<>(
|
final IncrementalIndex outerQueryResultIndex = makeIncrementalIndex(
|
||||||
outerQuery.applyLimit(
|
outerQuery,
|
||||||
Sequences.concat(
|
Sequences.concat(
|
||||||
Sequences.map(
|
Sequences.map(
|
||||||
Sequences.simple(outerQuery.getIntervals()),
|
Sequences.simple(outerQuery.getIntervals()),
|
||||||
new Function<Interval, Sequence<Row>>()
|
new Function<Interval, Sequence<Row>>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public Sequence<Row> apply(Interval interval)
|
public Sequence<Row> apply(Interval interval)
|
||||||
{
|
{
|
||||||
return engine.process(
|
return engine.process(
|
||||||
outerQuery.withQuerySegmentSpec(
|
outerQuery.withQuerySegmentSpec(
|
||||||
new MultipleIntervalSegmentSpec(ImmutableList.of(interval))
|
new MultipleIntervalSegmentSpec(ImmutableList.of(interval))
|
||||||
),
|
),
|
||||||
new IncrementalIndexStorageAdapter(index)
|
new IncrementalIndexStorageAdapter(innerQueryResultIndex)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
)
|
|
||||||
)
|
)
|
||||||
),
|
)
|
||||||
index
|
|
||||||
);
|
);
|
||||||
|
|
||||||
|
innerQueryResultIndex.close();
|
||||||
|
|
||||||
|
return new ResourceClosingSequence<>(
|
||||||
|
outerQuery.applyLimit(postAggregate(query, outerQueryResultIndex)),
|
||||||
|
outerQueryResultIndex
|
||||||
|
);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
final IncrementalIndex index = makeIncrementalIndex(
|
final IncrementalIndex index = makeIncrementalIndex(
|
||||||
query, runner.run(
|
query, runner.run(
|
||||||
|
|
|
@ -2809,6 +2809,63 @@ public class GroupByQueryRunnerTest
|
||||||
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSubqueryWithExtractionFnInOuterQuery()
|
||||||
|
{
|
||||||
|
//https://github.com/druid-io/druid/issues/2556
|
||||||
|
|
||||||
|
GroupByQuery subquery = GroupByQuery
|
||||||
|
.builder()
|
||||||
|
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||||
|
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
|
||||||
|
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
|
||||||
|
.setDimFilter(new JavaScriptDimFilter("quality", "function(dim){ return true; }"))
|
||||||
|
.setAggregatorSpecs(
|
||||||
|
Arrays.asList(
|
||||||
|
QueryRunnerTestHelper.rowsCount,
|
||||||
|
new LongSumAggregatorFactory("idx", "index")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
GroupByQuery query = GroupByQuery
|
||||||
|
.builder()
|
||||||
|
.setDataSource(subquery)
|
||||||
|
.setQuerySegmentSpec(
|
||||||
|
new MultipleIntervalSegmentSpec(
|
||||||
|
ImmutableList.of(
|
||||||
|
new Interval("2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setDimensions(Lists.<DimensionSpec>newArrayList(
|
||||||
|
new ExtractionDimensionSpec(
|
||||||
|
"alias",
|
||||||
|
"alias",
|
||||||
|
new RegexDimExtractionFn("(a).*", true, "a")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setAggregatorSpecs(
|
||||||
|
Arrays.<AggregatorFactory>asList(
|
||||||
|
new LongSumAggregatorFactory("rows", "rows"),
|
||||||
|
new LongSumAggregatorFactory("idx", "idx")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.setGranularity(QueryRunnerTestHelper.dayGran)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
List<Row> expectedResults = Arrays.asList(
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "a", "rows", 13L, "idx", 6619L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "a", "rows", 13L, "idx", 5827L)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Subqueries are handled by the ToolChest
|
||||||
|
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
|
||||||
|
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDifferentGroupingSubquery()
|
public void testDifferentGroupingSubquery()
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue