mirror of https://github.com/apache/druid.git
while GroupBy merging use unsorted facts in IncrementalIndex wherever possible
This commit is contained in:
parent
02dfd5cd80
commit
dc0214bddb
|
@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
public class GroupByQueryHelper
|
public class GroupByQueryHelper
|
||||||
{
|
{
|
||||||
private static final String CTX_KEY_MAX_RESULTS = "maxResults";
|
private static final String CTX_KEY_MAX_RESULTS = "maxResults";
|
||||||
|
public final static String CTX_KEY_SORT_RESULTS = "sortResults";
|
||||||
|
|
||||||
public static <T> Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> createIndexAccumulatorPair(
|
public static <T> Pair<IncrementalIndex, Accumulator<IncrementalIndex, T>> createIndexAccumulatorPair(
|
||||||
final GroupByQuery query,
|
final GroupByQuery query,
|
||||||
|
@ -81,7 +82,9 @@ public class GroupByQueryHelper
|
||||||
);
|
);
|
||||||
final IncrementalIndex index;
|
final IncrementalIndex index;
|
||||||
|
|
||||||
if (query.getContextBoolean("useOffheap", false)) {
|
final boolean sortResults = query.getContextValue(CTX_KEY_SORT_RESULTS, true);
|
||||||
|
|
||||||
|
if (query.getContextValue("useOffheap", false)) {
|
||||||
index = new OffheapIncrementalIndex(
|
index = new OffheapIncrementalIndex(
|
||||||
// use granularity truncated min timestamp
|
// use granularity truncated min timestamp
|
||||||
// since incoming truncated timestamps may precede timeStart
|
// since incoming truncated timestamps may precede timeStart
|
||||||
|
@ -90,7 +93,7 @@ public class GroupByQueryHelper
|
||||||
aggs.toArray(new AggregatorFactory[aggs.size()]),
|
aggs.toArray(new AggregatorFactory[aggs.size()]),
|
||||||
false,
|
false,
|
||||||
true,
|
true,
|
||||||
true,
|
sortResults,
|
||||||
Math.min(query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), config.getMaxResults()),
|
Math.min(query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), config.getMaxResults()),
|
||||||
bufferPool
|
bufferPool
|
||||||
);
|
);
|
||||||
|
@ -103,7 +106,7 @@ public class GroupByQueryHelper
|
||||||
aggs.toArray(new AggregatorFactory[aggs.size()]),
|
aggs.toArray(new AggregatorFactory[aggs.size()]),
|
||||||
false,
|
false,
|
||||||
true,
|
true,
|
||||||
true,
|
sortResults,
|
||||||
Math.min(query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), config.getMaxResults())
|
Math.min(query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), config.getMaxResults())
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -161,7 +161,18 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
||||||
throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
|
throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
|
||||||
}
|
}
|
||||||
|
|
||||||
final Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner, context);
|
final Sequence<Row> subqueryResult = mergeGroupByResults(
|
||||||
|
subquery.withOverriddenContext(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
//setting sort to false avoids unnecessary sorting while merging results. we only need to sort
|
||||||
|
//in the end when returning results to user.
|
||||||
|
GroupByQueryHelper.CTX_KEY_SORT_RESULTS,
|
||||||
|
false
|
||||||
|
)
|
||||||
|
),
|
||||||
|
runner,
|
||||||
|
context
|
||||||
|
);
|
||||||
final Set<AggregatorFactory> aggs = Sets.newHashSet();
|
final Set<AggregatorFactory> aggs = Sets.newHashSet();
|
||||||
|
|
||||||
// Nested group-bys work by first running the inner query and then materializing the results in an incremental
|
// Nested group-bys work by first running the inner query and then materializing the results in an incremental
|
||||||
|
@ -200,7 +211,14 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
||||||
.setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec()))
|
.setLimitSpec(query.getLimitSpec().merge(subquery.getLimitSpec()))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
final IncrementalIndex innerQueryResultIndex = makeIncrementalIndex(innerQuery, subqueryResult);
|
final IncrementalIndex innerQueryResultIndex = makeIncrementalIndex(
|
||||||
|
innerQuery.withOverriddenContext(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
GroupByQueryHelper.CTX_KEY_SORT_RESULTS, true
|
||||||
|
)
|
||||||
|
),
|
||||||
|
subqueryResult
|
||||||
|
);
|
||||||
|
|
||||||
//Outer query might have multiple intervals, but they are expected to be non-overlapping and sorted which
|
//Outer query might have multiple intervals, but they are expected to be non-overlapping and sorted which
|
||||||
//is ensured by QuerySegmentSpec.
|
//is ensured by QuerySegmentSpec.
|
||||||
|
@ -253,7 +271,10 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
||||||
query.getContext()
|
query.getContext()
|
||||||
).withOverriddenContext(
|
).withOverriddenContext(
|
||||||
ImmutableMap.<String, Object>of(
|
ImmutableMap.<String, Object>of(
|
||||||
"finalize", false
|
"finalize", false,
|
||||||
|
//setting sort to false avoids unnecessary sorting while merging results. we only need to sort
|
||||||
|
//in the end when returning results to user.
|
||||||
|
GroupByQueryHelper.CTX_KEY_SORT_RESULTS, false
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
, context
|
, context
|
||||||
|
|
Loading…
Reference in New Issue