add "subtotalsSpec" attribute to groupBy query (#5280)

* add subtotalsSpec attribute to groupBy query

* dont sent subtotalsSpec to downstream nodes from broker and other updates

* address review comment

* fix checkstyle issues after merge to master

* add docs for subtotalsSpec feature

* address doc review comments
This commit is contained in:
Himanshu 2018-08-28 17:46:38 -07:00 committed by Gian Merlino
parent fcf8c8d53c
commit 1fae6513e1
11 changed files with 633 additions and 98 deletions

View File

@ -56,7 +56,7 @@ An example groupBy query object is shown below:
}
```
There are 11 main parts to a groupBy query:
Following are main parts to a groupBy query:
|property|description|required?|
|--------|-----------|---------|
@ -70,6 +70,7 @@ There are 11 main parts to a groupBy query:
|aggregations|See [Aggregations](../querying/aggregations.html)|no|
|postAggregations|See [Post Aggregations](../querying/post-aggregations.html)|no|
|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
|subtotalsSpec| A JSON array of arrays to return additional result sets for groupings of subsets of top level `dimensions`. It is [described later](groupbyquery.html#more-on-subtotalsspec) in more detail.|no|
|context|An additional JSON Object which can be used to specify certain flags.|no|
To pull it all together, the above query would return *n\*m* data points, up to a maximum of 5000 points, where n is the cardinality of the `country` dimension, m is the cardinality of the `device` dimension, each day between 2012-01-01 and 2012-01-03, from the `sample_datasource` table. Each data point contains the (long) sum of `total_usage` if the value of the data point is greater than 100, the (double) sum of `data_transfer` and the (double) result of `total_usage` divided by `data_transfer` for the filter set for a particular grouping of `country` and `device`. The output looks like this:
@ -113,6 +114,92 @@ improve performance.
See [Multi-value dimensions](multi-value-dimensions.html) for more details.
### More on subtotalsSpec
The subtotals feature allows computation of multiple sub-groupings in a single query. To use this feature, add a "subtotalsSpec" to your query, which should be a list of subgroup dimension sets. It should contain the "outputName" from dimensions in your "dimensions" attribute, in the same order as they appear in the "dimensions" attribute (although, of course, you may skip some). For example, consider a groupBy query like this one:
```json
{
"type": "groupBy",
...
...
"dimensions": [
{
"type" : "default",
"dimension" : "d1col",
"outputName": "D1"
},
{
"type" : "extraction",
"dimension" : "d2col",
"outputName" : "D2",
"extractionFn" : extraction_func
},
{
"type":"lookup",
"dimension":"d3col",
"outputName":"D3",
"name":"my_lookup"
}
],
...
...
"subtotalsSpec":[ ["D1", "D2", D3"], ["D1", "D3"], ["D3"]],
..
}
```
Response returned would be equivalent to concatenating result of 3 groupBy queries with "dimensions" field being ["D1", "D2", D3"], ["D1", "D3"] and ["D3"] with appropriate `DimensionSpec` json blob as used in above query.
Response for above query would look something like below...
```json
[
{
"version" : "v1",
"timestamp" : "t1",
"event" : { "D1": "..", "D2": "..", "D3": ".." }
}
},
{
"version" : "v1",
"timestamp" : "t2",
"event" : { "D1": "..", "D2": "..", "D3": ".." }
}
},
...
...
{
"version" : "v1",
"timestamp" : "t1",
"event" : { "D1": "..", "D3": ".." }
}
},
{
"version" : "v1",
"timestamp" : "t2",
"event" : { "D1": "..", "D3": ".." }
}
},
...
...
{
"version" : "v1",
"timestamp" : "t1",
"event" : { "D3": ".." }
}
},
{
"version" : "v1",
"timestamp" : "t2",
"event" : { "D3": ".." }
}
},
...
]
```
### Implementation details
#### Strategies
@ -182,6 +269,10 @@ With groupBy v2, cluster operators should make sure that the off-heap hash table
will not exceed available memory for the maximum possible concurrent query load (given by
druid.processing.numMergeBuffers). See [How much direct memory does Druid use?](../operations/performance-faq.html) for more details.
Brokers do not need merge buffers for basic groupBy queries. Queries with subqueries (using a "query" [dataSource](datasource.html#query-data-source)) require one merge buffer if there is a single subquery, or two merge buffers if there is more than one layer of nested subqueries. Queries with [subtotals](groupbyquery.html#more-on-subtotalsspec) need one merge buffer. These can stack on top of each other: a groupBy query with multiple layers of nested subqueries, and that also uses subtotals, will need three merge buffers.
Historicals and ingestion tasks need one merge buffer for each groupBy query, unless [parallel combination](groupbyquery.html#parallel-combine) is enabled, in which case they need two merge buffers per query.
When using groupBy v1, all aggregation is done on-heap, and resource limits are done through the parameter
druid.query.groupBy.maxResults. This is a cap on the maximum number of results in a result set. Queries that exceed
this limit will fail with a "Resource limit exceeded" error indicating they exceeded their row limit. Cluster

View File

@ -21,6 +21,7 @@ package io.druid.query.groupby;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Functions;
@ -100,6 +101,7 @@ public class GroupByQuery extends BaseQuery<Row>
private final List<DimensionSpec> dimensions;
private final List<AggregatorFactory> aggregatorSpecs;
private final List<PostAggregator> postAggregatorSpecs;
private final List<List<String>> subtotalsSpec;
private final boolean applyLimitPushDown;
private final Function<Sequence<Row>, Sequence<Row>> postProcessingFn;
@ -116,6 +118,7 @@ public class GroupByQuery extends BaseQuery<Row>
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
@JsonProperty("having") HavingSpec havingSpec,
@JsonProperty("limitSpec") LimitSpec limitSpec,
@JsonProperty("subtotalsSpec") List<List<String>> subtotalsSpec,
@JsonProperty("context") Map<String, Object> context
)
{
@ -130,6 +133,7 @@ public class GroupByQuery extends BaseQuery<Row>
postAggregatorSpecs,
havingSpec,
limitSpec,
subtotalsSpec,
null,
context
);
@ -172,6 +176,7 @@ public class GroupByQuery extends BaseQuery<Row>
final List<PostAggregator> postAggregatorSpecs,
final HavingSpec havingSpec,
final LimitSpec limitSpec,
final @Nullable List<List<String>> subtotalsSpec,
final @Nullable Function<Sequence<Row>, Sequence<Row>> postProcessingFn,
final Map<String, Object> context
)
@ -194,6 +199,7 @@ public class GroupByQuery extends BaseQuery<Row>
this.havingSpec = havingSpec;
this.limitSpec = LimitSpec.nullToNoopLimitSpec(limitSpec);
this.subtotalsSpec = verifySubtotalsSpec(subtotalsSpec, dimensions);
// Verify no duplicate names between dimensions, aggregators, and postAggregators.
// They will all end up in the same namespace in the returned Rows and we can't have them clobbering each other.
@ -206,6 +212,40 @@ public class GroupByQuery extends BaseQuery<Row>
this.applyLimitPushDown = determineApplyLimitPushDown();
}
private List<List<String>> verifySubtotalsSpec(List<List<String>> subtotalsSpec, List<DimensionSpec> dimensions)
{
// if subtotalsSpec exists then validate that all are subsets of dimensions spec and are in same order.
// For example if we had {D1, D2, D3} in dimensions spec then
// {D2}, {D1, D2}, {D1, D3}, {D2, D3} etc are valid in subtotalsSpec while
// {D2, D1} is not as it is not in same order.
// {D4} is not as its not a subset.
// This restriction as enforced because implementation does sort merge on the results of top-level query
// results and expects that ordering of events does not change when dimension columns are removed from
// results of top level query.
if (subtotalsSpec != null) {
for (List<String> subtotalSpec : subtotalsSpec) {
int i = 0;
for (String s : subtotalSpec) {
boolean found = false;
for (; i < dimensions.size(); i++) {
if (s.equals(dimensions.get(i).getOutputName())) {
found = true;
break;
}
}
if (!found) {
throw new IAE(
"Subtotal spec %s is either not a subset or items are in different order than in dimensions.",
subtotalSpec
);
}
}
}
}
return subtotalsSpec;
}
@JsonProperty
public VirtualColumns getVirtualColumns()
{
@ -249,6 +289,13 @@ public class GroupByQuery extends BaseQuery<Row>
return limitSpec;
}
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("subtotalsSpec")
public List<List<String>> getSubtotalsSpec()
{
return subtotalsSpec;
}
@Override
public boolean hasFilters()
{
@ -329,6 +376,10 @@ public class GroupByQuery extends BaseQuery<Row>
public boolean determineApplyLimitPushDown()
{
if (subtotalsSpec != null) {
return false;
}
final boolean forceLimitPushDown = validateAndGetForceLimitPushDown();
if (limitSpec instanceof DefaultLimitSpec) {
@ -628,6 +679,16 @@ public class GroupByQuery extends BaseQuery<Row>
return new Builder(this).setLimitSpec(limitSpec).build();
}
public GroupByQuery withAggregatorSpecs(final List<AggregatorFactory> aggregatorSpecs)
{
return new Builder(this).setAggregatorSpecs(aggregatorSpecs).build();
}
public GroupByQuery withSubtotalsSpec(final List<List<String>> subtotalsSpec)
{
return new Builder(this).setSubtotalsSpec(subtotalsSpec).build();
}
public GroupByQuery withPostAggregatorSpecs(final List<PostAggregator> postAggregatorSpecs)
{
return new Builder(this).setPostAggregatorSpecs(postAggregatorSpecs).build();
@ -687,6 +748,7 @@ public class GroupByQuery extends BaseQuery<Row>
private Map<String, Object> context;
private List<List<String>> subtotalsSpec = null;
private LimitSpec limitSpec = null;
private Function<Sequence<Row>, Sequence<Row>> postProcessingFn;
private List<OrderByColumnSpec> orderByColumnSpecs = Lists.newArrayList();
@ -708,6 +770,7 @@ public class GroupByQuery extends BaseQuery<Row>
postAggregatorSpecs = query.getPostAggregatorSpecs();
havingSpec = query.getHavingSpec();
limitSpec = query.getLimitSpec();
subtotalsSpec = query.subtotalsSpec;
postProcessingFn = query.postProcessingFn;
context = query.getContext();
}
@ -788,6 +851,12 @@ public class GroupByQuery extends BaseQuery<Row>
return this;
}
public Builder setSubtotalsSpec(List<List<String>> subtotalsSpec)
{
this.subtotalsSpec = subtotalsSpec;
return this;
}
public Builder addOrderByColumn(String dimension)
{
return addOrderByColumn(dimension, null);
@ -962,6 +1031,7 @@ public class GroupByQuery extends BaseQuery<Row>
postAggregatorSpecs,
havingSpec,
theLimitSpec,
subtotalsSpec,
postProcessingFn,
context
);

View File

@ -218,9 +218,31 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
finalizingResults = subqueryResult;
}
return groupByStrategy.processSubqueryResult(subquery, query, resource, finalizingResults);
if (query.getSubtotalsSpec() != null) {
return groupByStrategy.processSubtotalsSpec(
query,
resource,
groupByStrategy.processSubqueryResult(subquery, query, resource, finalizingResults)
);
} else {
return groupByStrategy.mergeResults(runner, query, context);
return groupByStrategy.applyPostProcessing(groupByStrategy.processSubqueryResult(
subquery,
query,
resource,
finalizingResults
), query);
}
} else {
if (query.getSubtotalsSpec() != null) {
return groupByStrategy.processSubtotalsSpec(
query,
resource,
groupByStrategy.mergeResults(runner, query.withSubtotalsSpec(null), context)
);
} else {
return groupByStrategy.applyPostProcessing(groupByStrategy.mergeResults(runner, query, context), query);
}
}
}

View File

@ -22,7 +22,6 @@ package io.druid.query.groupby.epinephelinae;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import io.druid.collections.ResourceHolder;
import io.druid.common.guava.SettableSupplier;
import io.druid.data.input.Row;
@ -30,7 +29,6 @@ import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.guava.Accumulator;
import io.druid.java.util.common.guava.BaseSequence;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.FilteredSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.Query;
@ -58,7 +56,7 @@ import java.util.UUID;
public class GroupByRowProcessor
{
public static Sequence<Row> process(
public static Grouper createGrouper(
final Query queryParam,
final Sequence<Row> rows,
final Map<String, ValueType> rowSignature,
@ -66,7 +64,8 @@ public class GroupByRowProcessor
final GroupByQueryResource resource,
final ObjectMapper spillMapper,
final String processingTmpDir,
final int mergeBufferSize
final int mergeBufferSize,
final List<Closeable> closeOnExit
)
{
final GroupByQuery query = (GroupByQuery) queryParam;
@ -122,17 +121,6 @@ public class GroupByRowProcessor
}
);
return new BaseSequence<>(
new BaseSequence.IteratorMaker<Row, CloseableGrouperIterator<RowBasedKey, Row>>()
{
@Override
public CloseableGrouperIterator<RowBasedKey, Row> make()
{
// This contains all closeable objects which are closed when the returned iterator iterates all the elements,
// or an exceptions is thrown. The objects are closed in their reverse order.
final List<Closeable> closeOnExit = Lists.newArrayList();
try {
final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage(
temporaryStorageDirectory,
querySpecificConfig.getMaxOnDiskStorage()
@ -169,29 +157,24 @@ public class GroupByRowProcessor
throw new ResourceLimitExceededException(retVal.getReason());
}
return RowBasedGrouperHelper.makeGrouperIterator(
grouper,
query,
new Closeable()
return grouper;
}
public static Sequence<Row> getRowsFromGrouper(GroupByQuery query, List<String> subtotalSpec, Supplier<Grouper> grouper)
{
return new BaseSequence<>(
new BaseSequence.IteratorMaker<Row, CloseableGrouperIterator<RowBasedKey, Row>>()
{
@Override
public void close()
public CloseableGrouperIterator<RowBasedKey, Row> make()
{
for (Closeable closeable : Lists.reverse(closeOnExit)) {
CloseQuietly.close(closeable);
}
}
}
return RowBasedGrouperHelper.makeGrouperIterator(
grouper.get(),
query,
subtotalSpec,
() -> {}
);
}
catch (Throwable e) {
// Exception caught while setting up the iterator; release resources.
for (Closeable closeable : Lists.reverse(closeOnExit)) {
CloseQuietly.close(closeable);
}
throw e;
}
}
@Override
public void cleanup(CloseableGrouperIterator<RowBasedKey, Row> iterFromMake)

View File

@ -76,6 +76,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -409,6 +410,16 @@ public class RowBasedGrouperHelper
final GroupByQuery query,
final Closeable closeable
)
{
return makeGrouperIterator(grouper, query, null, closeable);
}
public static CloseableGrouperIterator<RowBasedKey, Row> makeGrouperIterator(
final Grouper<RowBasedKey> grouper,
final GroupByQuery query,
final List<String> dimsToInclude,
final Closeable closeable
)
{
final boolean includeTimestamp = GroupByStrategyV2.getUniversalTimestamp(query) == null;
@ -435,6 +446,7 @@ public class RowBasedGrouperHelper
}
// Add dimensions.
if (dimsToInclude == null) {
for (int i = dimStart; i < entry.getKey().getKey().length; i++) {
Object dimVal = entry.getKey().getKey()[i];
theMap.put(
@ -442,6 +454,20 @@ public class RowBasedGrouperHelper
dimVal instanceof String ? NullHandling.emptyToNullIfNeeded((String) dimVal) : dimVal
);
}
} else {
Map<String, Object> dimensions = new HashMap<>();
for (int i = dimStart; i < entry.getKey().getKey().length; i++) {
Object dimVal = entry.getKey().getKey()[i];
dimensions.put(
query.getDimensions().get(i - dimStart).getOutputName(),
dimVal instanceof String ? NullHandling.emptyToNullIfNeeded((String) dimVal) : dimVal
);
}
for (String dimToInclude : dimsToInclude) {
theMap.put(dimToInclude, dimensions.get(dimToInclude));
}
}
// Add aggregations.
for (int i = 0; i < entry.getValues().length; i++) {

View File

@ -73,6 +73,11 @@ public interface GroupByStrategy
Map<String, Object> responseContext
);
Sequence<Row> applyPostProcessing(
Sequence<Row> results,
GroupByQuery query
);
Sequence<Row> processSubqueryResult(
GroupByQuery subquery,
GroupByQuery query,
@ -80,6 +85,12 @@ public interface GroupByStrategy
Sequence<Row> subqueryResult
);
Sequence<Row> processSubtotalsSpec(
GroupByQuery query,
GroupByQueryResource resource,
Sequence<Row> queryResult
);
QueryRunner<Row> mergeRunners(
ListeningExecutorService exec,
Iterable<QueryRunner<Row>> queryRunners

View File

@ -21,6 +21,7 @@ package io.druid.query.groupby.strategy;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
@ -55,6 +56,11 @@ public class GroupByStrategySelector
return strategyV2;
case STRATEGY_V1:
// Fail early if subtotals were asked from GroupBy V1
if (query.getSubtotalsSpec() != null) {
throw new IAE("GroupBy Strategy [%s] does not support subtotalsSpec.", STRATEGY_V1);
}
return strategyV1;
default:

View File

@ -146,7 +146,15 @@ public class GroupByStrategyV1 implements GroupByStrategy
true
);
return Sequences.withBaggage(query.postProcess(GroupByQueryHelper.postAggregate(query, index)), index);
return Sequences.withBaggage(GroupByQueryHelper.postAggregate(query, index), index);
}
@Override
public Sequence<Row> applyPostProcessing(
Sequence<Row> results, GroupByQuery query
)
{
return query.postProcess(results);
}
@Override
@ -254,6 +262,14 @@ public class GroupByStrategyV1 implements GroupByStrategy
);
}
@Override
public Sequence<Row> processSubtotalsSpec(
GroupByQuery query, GroupByQueryResource resource, Sequence<Row> queryResult
)
{
throw new UnsupportedOperationException("subtotalsSpec is not supported for v1 groupBy strategy.");
}
@Override
public QueryRunner<Row> mergeRunners(
final ListeningExecutorService exec,

View File

@ -22,7 +22,9 @@ package io.druid.query.groupby.strategy;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListeningExecutorService;
@ -38,6 +40,7 @@ import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.guava.nary.BinaryFn;
@ -54,6 +57,8 @@ import io.druid.query.QueryWatcher;
import io.druid.query.ResourceLimitExceededException;
import io.druid.query.ResultMergeQueryRunner;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryHelper;
@ -62,14 +67,19 @@ import io.druid.query.groupby.epinephelinae.GroupByBinaryFnV2;
import io.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2;
import io.druid.query.groupby.epinephelinae.GroupByQueryEngineV2;
import io.druid.query.groupby.epinephelinae.GroupByRowProcessor;
import io.druid.query.groupby.epinephelinae.Grouper;
import io.druid.query.groupby.resource.GroupByQueryResource;
import io.druid.segment.StorageAdapter;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class GroupByStrategyV2 implements GroupByStrategy
{
@ -131,7 +141,8 @@ public class GroupByStrategyV2 implements GroupByStrategy
public GroupByQueryResource prepareResource(GroupByQuery query, boolean willMergeRunners)
{
if (!willMergeRunners) {
final int requiredMergeBufferNum = countRequiredMergeBufferNum(query, 1);
final int requiredMergeBufferNum = countRequiredMergeBufferNum(query, 1) +
(query.getSubtotalsSpec() != null ? 1 : 0);
if (requiredMergeBufferNum > mergeBufferPool.maxSize()) {
throw new ResourceLimitExceededException(
@ -250,12 +261,13 @@ public class GroupByStrategyV2 implements GroupByStrategy
// Don't do "having" clause until the end of this method.
null,
query.getLimitSpec(),
query.getSubtotalsSpec(),
query.getContext()
).withOverriddenContext(
context.build()
);
Sequence<Row> rowSequence = Sequences.map(
return Sequences.map(
mergingQueryRunner.run(
QueryPlus.wrap(newQuery),
responseContext
@ -291,12 +303,18 @@ public class GroupByStrategyV2 implements GroupByStrategy
}
}
);
}
@Override
public Sequence<Row> applyPostProcessing(
Sequence<Row> results, GroupByQuery query
)
{
// Don't apply limit here for inner results, that will be pushed down to the BufferHashGrouper
if (query.getContextBoolean(CTX_KEY_OUTERMOST, true)) {
return query.postProcess(rowSequence);
return query.postProcess(results);
} else {
return rowSequence;
return results;
}
}
@ -308,7 +326,13 @@ public class GroupByStrategyV2 implements GroupByStrategy
Sequence<Row> subqueryResult
)
{
final Sequence<Row> results = GroupByRowProcessor.process(
// This contains all closeable objects which are closed when the returned iterator iterates all the elements,
// or an exceptions is thrown. The objects are closed in their reverse order.
final List<Closeable> closeOnExit = Lists.newArrayList();
try {
Supplier<Grouper> grouperSupplier = Suppliers.memoize(
() -> GroupByRowProcessor.createGrouper(
query,
subqueryResult,
GroupByQueryHelper.rowSignatureFor(subquery),
@ -316,16 +340,113 @@ public class GroupByStrategyV2 implements GroupByStrategy
resource,
spillMapper,
processingConfig.getTmpDir(),
processingConfig.intermediateComputeSizeBytes()
processingConfig.intermediateComputeSizeBytes(),
closeOnExit
)
);
return mergeResults(new QueryRunner<Row>()
return Sequences.withBaggage(
mergeResults(new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
{
return results;
return GroupByRowProcessor.getRowsFromGrouper(
query,
null,
grouperSupplier
);
}
}, query, null),
() -> Lists.reverse(closeOnExit).forEach(closeable -> CloseQuietly.close(closeable))
);
}
catch (Exception ex) {
Lists.reverse(closeOnExit).forEach(closeable -> CloseQuietly.close(closeable));
throw ex;
}
}
@Override
public Sequence<Row> processSubtotalsSpec(
GroupByQuery query,
GroupByQueryResource resource,
Sequence<Row> queryResult
)
{
// This contains all closeable objects which are closed when the returned iterator iterates all the elements,
// or an exceptions is thrown. The objects are closed in their reverse order.
final List<Closeable> closeOnExit = Lists.newArrayList();
try {
GroupByQuery queryWithoutSubtotalsSpec = query.withSubtotalsSpec(null);
List<List<String>> subtotals = query.getSubtotalsSpec();
Supplier<Grouper> grouperSupplier = Suppliers.memoize(
() -> GroupByRowProcessor.createGrouper(
queryWithoutSubtotalsSpec.withAggregatorSpecs(
Lists.transform(queryWithoutSubtotalsSpec.getAggregatorSpecs(), (agg) -> agg.getCombiningFactory())
).withDimensionSpecs(
Lists.transform(
queryWithoutSubtotalsSpec.getDimensions(),
(dimSpec) -> new DefaultDimensionSpec(
dimSpec.getOutputName(),
dimSpec.getOutputName(),
dimSpec.getOutputType()
)
)
),
queryResult,
GroupByQueryHelper.rowSignatureFor(queryWithoutSubtotalsSpec),
configSupplier.get(),
resource,
spillMapper,
processingConfig.getTmpDir(),
processingConfig.intermediateComputeSizeBytes(),
closeOnExit
)
);
List<Sequence<Row>> subtotalsResults = new ArrayList<>(subtotals.size());
Map<String, DimensionSpec> queryDimensionSpecs = new HashMap(queryWithoutSubtotalsSpec.getDimensions().size());
for (DimensionSpec dimSpec : queryWithoutSubtotalsSpec.getDimensions()) {
queryDimensionSpecs.put(dimSpec.getOutputName(), dimSpec);
}
for (List<String> subtotalSpec : subtotals) {
GroupByQuery subtotalQuery = queryWithoutSubtotalsSpec.withDimensionSpecs(
subtotalSpec.stream()
.map(s -> new DefaultDimensionSpec(s, s, queryDimensionSpecs.get(s).getOutputType()))
.collect(Collectors.toList())
);
subtotalsResults.add(applyPostProcessing(
mergeResults(new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
{
return GroupByRowProcessor.getRowsFromGrouper(
queryWithoutSubtotalsSpec,
subtotalSpec,
grouperSupplier
);
}
}, subtotalQuery, null),
subtotalQuery
)
);
}
return Sequences.withBaggage(
Sequences.concat(subtotalsResults),
() -> Lists.reverse(closeOnExit).forEach(closeable -> CloseQuietly.close(closeable))
);
}
catch (Exception ex) {
Lists.reverse(closeOnExit).forEach(closeable -> CloseQuietly.close(closeable));
throw ex;
}
}, query, null);
}
@Override

View File

@ -5586,6 +5586,194 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testGroupByWithSubtotalsSpec()
{
if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
return;
}
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"), new DefaultDimensionSpec("market", "market")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index"),
new FloatSumAggregatorFactory("idxFloat", "indexFloat"),
new DoubleSumAggregatorFactory("idxDouble", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.setSubtotalsSpec(ImmutableList.of(
ImmutableList.of("alias"),
ImmutableList.of("market"),
ImmutableList.of()
))
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L, "idxFloat", 135.88510131835938f, "idxDouble", 135.88510131835938d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L, "idxFloat", 118.57034, "idxDouble", 118.57034),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L, "idxFloat", 158.747224, "idxDouble", 158.747224),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L, "idxFloat", 120.134704, "idxDouble", 120.134704),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L, "idxFloat", 2871.8866900000003f, "idxDouble", 2871.8866900000003d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L, "idxFloat", 121.58358f, "idxDouble", 121.58358d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L, "idxFloat", 2900.798647f, "idxDouble", 2900.798647d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L, "idxFloat", 78.622547f, "idxDouble", 78.622547d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L, "idxFloat", 119.922742f, "idxDouble", 119.922742d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L, "idxFloat", 147.42593f, "idxDouble", 147.42593d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L, "idxFloat", 112.987027f, "idxDouble", 112.987027d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L, "idxFloat", 166.016049f, "idxDouble", 166.016049d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L, "idxFloat", 113.446008f, "idxDouble", 113.446008d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L, "idxFloat", 2448.830613f, "idxDouble", 2448.830613d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L, "idxFloat", 114.290141f, "idxDouble", 114.290141d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L, "idxFloat", 2506.415148f, "idxDouble", 2506.415148d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L, "idxFloat", 97.387433f, "idxDouble", 97.387433d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L, "idxFloat", 126.411364f, "idxDouble", 126.411364d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "spot", "idxDouble", 643.043177, "idxFloat", 643.043212890625, "rows", 5L, "idx", 640L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "total_market", "idxDouble", 1314.839715, "idxFloat", 1314.8397, "rows", 1L, "idx", 1314L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "upfront", "idxDouble", 1447.34116, "idxFloat", 1447.3412, "rows", 1L, "idx", 1447L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "spot", "idxDouble", 266.090949, "idxFloat", 266.0909423828125, "rows", 2L, "idx", 265L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "total_market", "idxDouble", 1522.043733, "idxFloat", 1522.0437, "rows", 1L, "idx", 1522L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "upfront", "idxDouble", 1234.247546, "idxFloat", 1234.2476, "rows", 1L, "idx", 1234L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "spot", "idxDouble", 198.545289, "idxFloat", 198.5452880859375, "rows", 2L, "idx", 197L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "spot", "idxDouble", 650.806953, "idxFloat", 650.8069458007812, "rows", 5L, "idx", 648L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "total_market", "idxDouble", 1193.556278, "idxFloat", 1193.5563, "rows", 1L, "idx", 1193L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "upfront", "idxDouble", 1144.342401, "idxFloat", 1144.3424, "rows", 1L, "idx", 1144L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "spot", "idxDouble", 249.591647, "idxFloat", 249.59164428710938, "rows", 2L, "idx", 249L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "total_market", "idxDouble", 1321.375057, "idxFloat", 1321.375, "rows", 1L, "idx", 1321L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "upfront", "idxDouble", 1049.738585, "idxFloat", 1049.7385, "rows", 1L, "idx", 1049L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "spot", "idxDouble", 223.798797, "idxFloat", 223.79879760742188, "rows", 2L, "idx", 223L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 6626.151575318359, "idxFloat", 6626.152f, "rows", 13L, "idx", 6619L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 5833.209713, "idxFloat", 5833.209f, "rows", 13L, "idx", 5827L)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testGroupByWithSubtotalsSpecWithLongDimensionColumn()
{
if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
return;
}
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.newArrayList(new DefaultDimensionSpec("qualityLong", "ql", ValueType.LONG), new DefaultDimensionSpec("market", "market")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index"),
new FloatSumAggregatorFactory("idxFloat", "indexFloat"),
new DoubleSumAggregatorFactory("idxDouble", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.setSubtotalsSpec(ImmutableList.of(
ImmutableList.of("ql"),
ImmutableList.of("market"),
ImmutableList.of()
))
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 135.885094, "idxFloat", 135.8851, "ql", 1000L, "rows", 1L, "idx", 135L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 118.57034, "idxFloat", 118.57034, "ql", 1100L, "rows", 1L, "idx", 118L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 158.747224, "idxFloat", 158.74722, "ql", 1200L, "rows", 1L, "idx", 158L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 120.134704, "idxFloat", 120.134705, "ql", 1300L, "rows", 1L, "idx", 120L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 2871.8866900000003, "idxFloat", 2871.88671875, "ql", 1400L, "rows", 3L, "idx", 2870L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 121.583581, "idxFloat", 121.58358, "ql", 1500L, "rows", 1L, "idx", 121L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 2900.798647, "idxFloat", 2900.798583984375, "ql", 1600L, "rows", 3L, "idx", 2900L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 78.622547, "idxFloat", 78.62254, "ql", 1700L, "rows", 1L, "idx", 78L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 119.922742, "idxFloat", 119.922745, "ql", 1800L, "rows", 1L, "idx", 119L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 147.425935, "idxFloat", 147.42593, "ql", 1000L, "rows", 1L, "idx", 147L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 112.987027, "idxFloat", 112.98703, "ql", 1100L, "rows", 1L, "idx", 112L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 166.016049, "idxFloat", 166.01605, "ql", 1200L, "rows", 1L, "idx", 166L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 113.446008, "idxFloat", 113.44601, "ql", 1300L, "rows", 1L, "idx", 113L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 2448.830613, "idxFloat", 2448.83056640625, "ql", 1400L, "rows", 3L, "idx", 2447L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 114.290141, "idxFloat", 114.29014, "ql", 1500L, "rows", 1L, "idx", 114L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 2506.415148, "idxFloat", 2506.4150390625, "ql", 1600L, "rows", 3L, "idx", 2505L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 97.387433, "idxFloat", 97.387436, "ql", 1700L, "rows", 1L, "idx", 97L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 126.411364, "idxFloat", 126.41136, "ql", 1800L, "rows", 1L, "idx", 126L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "spot", "idxDouble", 643.043177, "idxFloat", 643.043212890625, "rows", 5L, "idx", 640L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "total_market", "idxDouble", 1314.839715, "idxFloat", 1314.8397, "rows", 1L, "idx", 1314L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "upfront", "idxDouble", 1447.34116, "idxFloat", 1447.3412, "rows", 1L, "idx", 1447L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "spot", "idxDouble", 266.090949, "idxFloat", 266.0909423828125, "rows", 2L, "idx", 265L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "total_market", "idxDouble", 1522.043733, "idxFloat", 1522.0437, "rows", 1L, "idx", 1522L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "upfront", "idxDouble", 1234.247546, "idxFloat", 1234.2476, "rows", 1L, "idx", 1234L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "spot", "idxDouble", 198.545289, "idxFloat", 198.5452880859375, "rows", 2L, "idx", 197L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "spot", "idxDouble", 650.806953, "idxFloat", 650.8069458007812, "rows", 5L, "idx", 648L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "total_market", "idxDouble", 1193.556278, "idxFloat", 1193.5563, "rows", 1L, "idx", 1193L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "upfront", "idxDouble", 1144.342401, "idxFloat", 1144.3424, "rows", 1L, "idx", 1144L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "spot", "idxDouble", 249.591647, "idxFloat", 249.59164428710938, "rows", 2L, "idx", 249L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "total_market", "idxDouble", 1321.375057, "idxFloat", 1321.375, "rows", 1L, "idx", 1321L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "upfront", "idxDouble", 1049.738585, "idxFloat", 1049.7385, "rows", 1L, "idx", 1049L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "market", "spot", "idxDouble", 223.798797, "idxFloat", 223.79879760742188, "rows", 2L, "idx", 223L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 6626.151569, "idxFloat", 6626.1513671875, "rows", 13L, "idx", 6619L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02T00:00:00.000Z", "idxDouble", 5833.209717999999, "idxFloat", 5833.20849609375, "rows", 13L, "idx", 5827L)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
for (Row row : results) {
System.out.println(row);
}
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testGroupByWithSubtotalsSpecWithOrderLimit()
{
if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
return;
}
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"), new DefaultDimensionSpec("market", "market")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index"),
new FloatSumAggregatorFactory("idxFloat", "indexFloat"),
new DoubleSumAggregatorFactory("idxDouble", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.setSubtotalsSpec(ImmutableList.of(
ImmutableList.of("alias"),
ImmutableList.of("market"),
ImmutableList.of()
))
.addOrderByColumn("idxDouble")
.setLimit(1)
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L, "idxFloat", 78.622547f, "idxDouble", 78.622547d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "market", "spot", "idxDouble", 198.545289, "idxFloat", 198.5452880859375, "rows", 2L, "idx", 197L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01T00:00:00.000Z", "idxDouble", 6626.151575318359, "idxFloat", 6626.152f, "rows", 13L, "idx", 6619L)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testGroupByWithTimeColumn()
{

View File

@ -924,6 +924,7 @@ public class DruidQuery
postAggregators,
grouping.getHavingFilter() != null ? new DimFilterHavingSpec(grouping.getHavingFilter(), true) : null,
limitSpec,
null,
ImmutableSortedMap.copyOf(plannerContext.getQueryContext())
);
}