添加查询需要的文档
This commit is contained in:
parent
6318c33333
commit
9db8616086
|
@ -0,0 +1,449 @@
|
||||||
|
# GroupBy 查询
|
||||||
|
|
||||||
|
> Apache Druid supports two query languages: [Druid SQL](sql.md) and [native queries](querying.md).
|
||||||
|
> This document describes a query
|
||||||
|
> type in the native language. For information about when Druid SQL will use this query type, refer to the
|
||||||
|
> [SQL documentation](sql.md#query-types).
|
||||||
|
|
||||||
|
These types of Apache Druid queries take a groupBy query object and return an array of JSON objects where each object represents a
|
||||||
|
grouping asked for by the query.
|
||||||
|
|
||||||
|
> Note: If you are doing aggregations with time as your only grouping, or an ordered groupBy over a single dimension,
|
||||||
|
> consider [Timeseries](timeseriesquery.md) and [TopN](topnquery.md) queries as well as
|
||||||
|
> groupBy. Their performance may be better in some cases. See [Alternatives](#alternatives) below for more details.
|
||||||
|
|
||||||
|
一个分组查询(groupBy query)对象的查询脚本如下示例:
|
||||||
|
|
||||||
|
``` json
|
||||||
|
{
|
||||||
|
"queryType": "groupBy",
|
||||||
|
"dataSource": "sample_datasource",
|
||||||
|
"granularity": "day",
|
||||||
|
"dimensions": ["country", "device"],
|
||||||
|
"limitSpec": { "type": "default", "limit": 5000, "columns": ["country", "data_transfer"] },
|
||||||
|
"filter": {
|
||||||
|
"type": "and",
|
||||||
|
"fields": [
|
||||||
|
{ "type": "selector", "dimension": "carrier", "value": "AT&T" },
|
||||||
|
{ "type": "or",
|
||||||
|
"fields": [
|
||||||
|
{ "type": "selector", "dimension": "make", "value": "Apple" },
|
||||||
|
{ "type": "selector", "dimension": "make", "value": "Samsung" }
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"aggregations": [
|
||||||
|
{ "type": "longSum", "name": "total_usage", "fieldName": "user_count" },
|
||||||
|
{ "type": "doubleSum", "name": "data_transfer", "fieldName": "data_transfer" }
|
||||||
|
],
|
||||||
|
"postAggregations": [
|
||||||
|
{ "type": "arithmetic",
|
||||||
|
"name": "avg_usage",
|
||||||
|
"fn": "/",
|
||||||
|
"fields": [
|
||||||
|
{ "type": "fieldAccess", "fieldName": "data_transfer" },
|
||||||
|
{ "type": "fieldAccess", "fieldName": "total_usage" }
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"intervals": [ "2012-01-01T00:00:00.000/2012-01-03T00:00:00.000" ],
|
||||||
|
"having": {
|
||||||
|
"type": "greaterThan",
|
||||||
|
"aggregation": "total_usage",
|
||||||
|
"value": 100
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Following are main parts to a groupBy query:
|
||||||
|
|
||||||
|
|property|description|required?|
|
||||||
|
|--------|-----------|---------|
|
||||||
|
|queryType|This String should always be "groupBy"; this is the first thing Druid looks at to figure out how to interpret the query|yes|
|
||||||
|
|dataSource|A String or Object defining the data source to query, very similar to a table in a relational database. See [DataSource](../querying/datasource.md) for more information.|yes|
|
||||||
|
|dimensions|A JSON list of dimensions to do the groupBy over; or see [DimensionSpec](../querying/dimensionspecs.md) for ways to extract dimensions. |yes|
|
||||||
|
|limitSpec|See [LimitSpec](../querying/limitspec.md).|no|
|
||||||
|
|having|See [Having](../querying/having.md).|no|
|
||||||
|
|granularity|Defines the granularity of the query. See [Granularities](../querying/granularities.md)|yes|
|
||||||
|
|filter|See [Filters](../querying/filters.md)|no|
|
||||||
|
|aggregations|See [Aggregations](../querying/aggregations.md)|no|
|
||||||
|
|postAggregations|See [Post Aggregations](../querying/post-aggregations.md)|no|
|
||||||
|
|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
|
||||||
|
|subtotalsSpec| A JSON array of arrays to return additional result sets for groupings of subsets of top level `dimensions`. It is [described later](groupbyquery.md#more-on-subtotalsspec) in more detail.|no|
|
||||||
|
|context|An additional JSON Object which can be used to specify certain flags.|no|
|
||||||
|
|
||||||
|
To pull it all together, the above query would return *n\*m* data points, up to a maximum of 5000 points, where n is the cardinality of the `country` dimension, m is the cardinality of the `device` dimension, each day between 2012-01-01 and 2012-01-03, from the `sample_datasource` table. Each data point contains the (long) sum of `total_usage` if the value of the data point is greater than 100, the (double) sum of `data_transfer` and the (double) result of `total_usage` divided by `data_transfer` for the filter set for a particular grouping of `country` and `device`. The output looks like this:
|
||||||
|
|
||||||
|
```json
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"version" : "v1",
|
||||||
|
"timestamp" : "2012-01-01T00:00:00.000Z",
|
||||||
|
"event" : {
|
||||||
|
"country" : <some_dim_value_one>,
|
||||||
|
"device" : <some_dim_value_two>,
|
||||||
|
"total_usage" : <some_value_one>,
|
||||||
|
"data_transfer" :<some_value_two>,
|
||||||
|
"avg_usage" : <some_avg_usage_value>
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"version" : "v1",
|
||||||
|
"timestamp" : "2012-01-01T00:00:12.000Z",
|
||||||
|
"event" : {
|
||||||
|
"dim1" : <some_other_dim_value_one>,
|
||||||
|
"dim2" : <some_other_dim_value_two>,
|
||||||
|
"sample_name1" : <some_other_value_one>,
|
||||||
|
"sample_name2" :<some_other_value_two>,
|
||||||
|
"avg_usage" : <some_other_avg_usage_value>
|
||||||
|
}
|
||||||
|
},
|
||||||
|
...
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
|
## Behavior on multi-value dimensions
|
||||||
|
|
||||||
|
groupBy queries can group on multi-value dimensions. When grouping on a multi-value dimension, _all_ values
|
||||||
|
from matching rows will be used to generate one group per value. It's possible for a query to return more groups than
|
||||||
|
there are rows. For example, a groupBy on the dimension `tags` with filter `"t1" AND "t3"` would match only row1, and
|
||||||
|
generate a result with three groups: `t1`, `t2`, and `t3`. If you only need to include values that match
|
||||||
|
your filter, you can use a [filtered dimensionSpec](dimensionspecs.md#filtered-dimensionspecs). This can also
|
||||||
|
improve performance.
|
||||||
|
|
||||||
|
See [Multi-value dimensions](multi-value-dimensions.md) for more details.
|
||||||
|
|
||||||
|
## More on subtotalsSpec
|
||||||
|
|
||||||
|
The subtotals feature allows computation of multiple sub-groupings in a single query. To use this feature, add a "subtotalsSpec" to your query as a list of subgroup dimension sets. It should contain the `outputName` from dimensions in your `dimensions` attribute, in the same order as they appear in the `dimensions` attribute (although, of course, you may skip some).
|
||||||
|
|
||||||
|
For example, consider a groupBy query like this one:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"type": "groupBy",
|
||||||
|
...
|
||||||
|
...
|
||||||
|
"dimensions": [
|
||||||
|
{
|
||||||
|
"type" : "default",
|
||||||
|
"dimension" : "d1col",
|
||||||
|
"outputName": "D1"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type" : "extraction",
|
||||||
|
"dimension" : "d2col",
|
||||||
|
"outputName" : "D2",
|
||||||
|
"extractionFn" : extraction_func
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type":"lookup",
|
||||||
|
"dimension":"d3col",
|
||||||
|
"outputName":"D3",
|
||||||
|
"name":"my_lookup"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
...
|
||||||
|
...
|
||||||
|
"subtotalsSpec":[ ["D1", "D2", D3"], ["D1", "D3"], ["D3"]],
|
||||||
|
..
|
||||||
|
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
The result of the subtotalsSpec would be equivalent to concatenating the result of three groupBy queries, with the "dimensions" field being `["D1", "D2", D3"]`, `["D1", "D3"]` and `["D3"]`, given the `DimensionSpec` shown above.
|
||||||
|
The response for the query above would look something like:
|
||||||
|
|
||||||
|
```json
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"version" : "v1",
|
||||||
|
"timestamp" : "t1",
|
||||||
|
"event" : { "D1": "..", "D2": "..", "D3": ".." }
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"version" : "v1",
|
||||||
|
"timestamp" : "t2",
|
||||||
|
"event" : { "D1": "..", "D2": "..", "D3": ".." }
|
||||||
|
}
|
||||||
|
},
|
||||||
|
...
|
||||||
|
...
|
||||||
|
|
||||||
|
{
|
||||||
|
"version" : "v1",
|
||||||
|
"timestamp" : "t1",
|
||||||
|
"event" : { "D1": "..", "D2": null, "D3": ".." }
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"version" : "v1",
|
||||||
|
"timestamp" : "t2",
|
||||||
|
"event" : { "D1": "..", "D2": null, "D3": ".." }
|
||||||
|
}
|
||||||
|
},
|
||||||
|
...
|
||||||
|
...
|
||||||
|
|
||||||
|
{
|
||||||
|
"version" : "v1",
|
||||||
|
"timestamp" : "t1",
|
||||||
|
"event" : { "D1": null, "D2": null, "D3": ".." }
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"version" : "v1",
|
||||||
|
"timestamp" : "t2",
|
||||||
|
"event" : { "D1": null, "D2": null, "D3": ".." }
|
||||||
|
}
|
||||||
|
},
|
||||||
|
...
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
|
> Notice that dimensions that are not included in an individual subtotalsSpec grouping are returned with a `null` value. This response format represents a behavior change as of Apache Druid 0.18.0.
|
||||||
|
> In release 0.17.0 and earlier, such dimensions were entirely excluded from the result. If you were relying on this old behavior to determine whether a particular dimension was not part of
|
||||||
|
> a subtotal grouping, you can now use [Grouping aggregator](aggregations.md#grouping-aggregator) instead.
|
||||||
|
|
||||||
|
|
||||||
|
## Implementation details
|
||||||
|
|
||||||
|
### Strategies
|
||||||
|
|
||||||
|
GroupBy queries can be executed using two different strategies. The default strategy for a cluster is determined by the
|
||||||
|
"druid.query.groupBy.defaultStrategy" runtime property on the Broker. This can be overridden using "groupByStrategy" in
|
||||||
|
the query context. If neither the context field nor the property is set, the "v2" strategy will be used.
|
||||||
|
|
||||||
|
- "v2", the default, is designed to offer better performance and memory management. This strategy generates
|
||||||
|
per-segment results using a fully off-heap map. Data processes merge the per-segment results using a fully off-heap
|
||||||
|
concurrent facts map combined with an on-heap string dictionary. This may optionally involve spilling to disk. Data
|
||||||
|
processes return sorted results to the Broker, which merges result streams using an N-way merge. The broker materializes
|
||||||
|
the results if necessary (e.g. if the query sorts on columns other than its dimensions). Otherwise, it streams results
|
||||||
|
back as they are merged.
|
||||||
|
|
||||||
|
- "v1", a legacy engine, generates per-segment results on data processes (Historical, realtime, MiddleManager) using a map which
|
||||||
|
is partially on-heap (dimension keys and the map itself) and partially off-heap (the aggregated values). Data processes then
|
||||||
|
merge the per-segment results using Druid's indexing mechanism. This merging is multi-threaded by default, but can
|
||||||
|
optionally be single-threaded. The Broker merges the final result set using Druid's indexing mechanism again. The broker
|
||||||
|
merging is always single-threaded. Because the Broker merges results using the indexing mechanism, it must materialize
|
||||||
|
the full result set before returning any results. On both the data processes and the Broker, the merging index is fully
|
||||||
|
on-heap by default, but it can optionally store aggregated values off-heap.
|
||||||
|
|
||||||
|
### Differences between v1 and v2
|
||||||
|
|
||||||
|
Query API and results are compatible between the two engines; however, there are some differences from a cluster
|
||||||
|
configuration perspective:
|
||||||
|
|
||||||
|
- groupBy v1 controls resource usage using a row-based limit (maxResults) whereas groupBy v2 uses bytes-based limits.
|
||||||
|
In addition, groupBy v1 merges results on-heap, whereas groupBy v2 merges results off-heap. These factors mean that
|
||||||
|
memory tuning and resource limits behave differently between v1 and v2. In particular, due to this, some queries
|
||||||
|
that can complete successfully in one engine may exceed resource limits and fail with the other engine. See the
|
||||||
|
"Memory tuning and resource limits" section for more details.
|
||||||
|
- groupBy v1 imposes no limit on the number of concurrently running queries, whereas groupBy v2 controls memory usage
|
||||||
|
by using a finite-sized merge buffer pool. By default, the number of merge buffers is 1/4 the number of processing
|
||||||
|
threads. You can adjust this as necessary to balance concurrency and memory usage.
|
||||||
|
- groupBy v1 supports caching on either the Broker or Historical processes, whereas groupBy v2 only supports caching on
|
||||||
|
Historical processes.
|
||||||
|
- groupBy v2 supports both array-based aggregation and hash-based aggregation. The array-based aggregation is used only
|
||||||
|
when the grouping key is a single indexed string column. In array-based aggregation, the dictionary-encoded value is used
|
||||||
|
as the index, so the aggregated values in the array can be accessed directly without finding buckets based on hashing.
|
||||||
|
|
||||||
|
### Memory tuning and resource limits
|
||||||
|
|
||||||
|
When using groupBy v2, three parameters control resource usage and limits:
|
||||||
|
|
||||||
|
- `druid.processing.buffer.sizeBytes`: size of the off-heap hash table used for aggregation, per query, in bytes. At
|
||||||
|
most `druid.processing.numMergeBuffers` of these will be created at once, which also serves as an upper limit on the
|
||||||
|
number of concurrently running groupBy queries.
|
||||||
|
- `druid.query.groupBy.maxMergingDictionarySize`: size of the on-heap dictionary used when grouping on strings, per query,
|
||||||
|
in bytes. Note that this is based on a rough estimate of the dictionary size, not the actual size.
|
||||||
|
- `druid.query.groupBy.maxOnDiskStorage`: amount of space on disk used for aggregation, per query, in bytes. By default,
|
||||||
|
this is 0, which means aggregation will not use disk.
|
||||||
|
|
||||||
|
If `maxOnDiskStorage` is 0 (the default) then a query that exceeds either the on-heap dictionary limit, or the off-heap
|
||||||
|
aggregation table limit, will fail with a "Resource limit exceeded" error describing the limit that was exceeded.
|
||||||
|
|
||||||
|
If `maxOnDiskStorage` is greater than 0, queries that exceed the in-memory limits will start using disk for aggregation.
|
||||||
|
In this case, when either the on-heap dictionary or off-heap hash table fills up, partially aggregated records will be
|
||||||
|
sorted and flushed to disk. Then, both in-memory structures will be cleared out for further aggregation. Queries that
|
||||||
|
then go on to exceed `maxOnDiskStorage` will fail with a "Resource limit exceeded" error indicating that they ran out of
|
||||||
|
disk space.
|
||||||
|
|
||||||
|
With groupBy v2, cluster operators should make sure that the off-heap hash tables and on-heap merging dictionaries
|
||||||
|
will not exceed available memory for the maximum possible concurrent query load (given by
|
||||||
|
`druid.processing.numMergeBuffers`). See the [basic cluster tuning guide](../operations/basic-cluster-tuning.md)
|
||||||
|
for more details about direct memory usage, organized by Druid process type.
|
||||||
|
|
||||||
|
Brokers do not need merge buffers for basic groupBy queries. Queries with subqueries (using a `query` dataSource) require one merge buffer if there is a single subquery, or two merge buffers if there is more than one layer of nested subqueries. Queries with [subtotals](groupbyquery.md#more-on-subtotalsspec) need one merge buffer. These can stack on top of each other: a groupBy query with multiple layers of nested subqueries, and that also uses subtotals, will need three merge buffers.
|
||||||
|
|
||||||
|
Historicals and ingestion tasks need one merge buffer for each groupBy query, unless [parallel combination](groupbyquery.md#parallel-combine) is enabled, in which case they need two merge buffers per query.
|
||||||
|
|
||||||
|
When using groupBy v1, all aggregation is done on-heap, and resource limits are done through the parameter
|
||||||
|
`druid.query.groupBy.maxResults`. This is a cap on the maximum number of results in a result set. Queries that exceed
|
||||||
|
this limit will fail with a "Resource limit exceeded" error indicating they exceeded their row limit. Cluster
|
||||||
|
operators should make sure that the on-heap aggregations will not exceed available JVM heap space for the expected
|
||||||
|
concurrent query load.
|
||||||
|
|
||||||
|
### Performance tuning for groupBy v2
|
||||||
|
|
||||||
|
#### Limit pushdown optimization
|
||||||
|
|
||||||
|
Druid pushes down the `limit` spec in groupBy queries to the segments on Historicals wherever possible to early prune unnecessary intermediate results and minimize the amount of data transferred to Brokers. By default, this technique is applied only when all fields in the `orderBy` spec is a subset of the grouping keys. This is because the `limitPushDown` doesn't guarantee the exact results if the `orderBy` spec includes any fields that are not in the grouping keys. However, you can enable this technique even in such cases if you can sacrifice some accuracy for fast query processing like in topN queries. See `forceLimitPushDown` in [advanced groupBy v2 configurations](#groupby-v2-configurations).
|
||||||
|
|
||||||
|
|
||||||
|
#### Optimizing hash table
|
||||||
|
|
||||||
|
The groupBy v2 engine uses an open addressing hash table for aggregation. The hash table is initialized with a given initial bucket number and gradually grows on buffer full. On hash collisions, the linear probing technique is used.
|
||||||
|
|
||||||
|
The default number of initial buckets is 1024 and the default max load factor of the hash table is 0.7. If you can see too many collisions in the hash table, you can adjust these numbers. See `bufferGrouperInitialBuckets` and `bufferGrouperMaxLoadFactor` in [Advanced groupBy v2 configurations](#groupby-v2-configurations).
|
||||||
|
|
||||||
|
|
||||||
|
#### Parallel combine
|
||||||
|
|
||||||
|
Once a Historical finishes aggregation using the hash table, it sorts the aggregated results and merges them before sending to the
|
||||||
|
Broker for N-way merge aggregation in the broker. By default, Historicals use all their available processing threads
|
||||||
|
(configured by `druid.processing.numThreads`) for aggregation, but use a single thread for sorting and merging
|
||||||
|
aggregates which is an http thread to send data to Brokers.
|
||||||
|
|
||||||
|
This is to prevent some heavy groupBy queries from blocking other queries. In Druid, the processing threads are shared
|
||||||
|
between all submitted queries and they are _not interruptible_. It means, if a heavy query takes all available
|
||||||
|
processing threads, all other queries might be blocked until the heavy query is finished. GroupBy queries usually take
|
||||||
|
longer time than timeseries or topN queries, they should release processing threads as soon as possible.
|
||||||
|
|
||||||
|
However, you might care about the performance of some really heavy groupBy queries. Usually, the performance bottleneck
|
||||||
|
of heavy groupBy queries is merging sorted aggregates. In such cases, you can use processing threads for it as well.
|
||||||
|
This is called _parallel combine_. To enable parallel combine, see `numParallelCombineThreads` in
|
||||||
|
[Advanced groupBy v2 configurations](#groupby-v2-configurations). Note that parallel combine can be enabled only when
|
||||||
|
data is actually spilled (see [Memory tuning and resource limits](#memory-tuning-and-resource-limits)).
|
||||||
|
|
||||||
|
Once parallel combine is enabled, the groupBy v2 engine can create a combining tree for merging sorted aggregates. Each
|
||||||
|
intermediate node of the tree is a thread merging aggregates from the child nodes. The leaf node threads read and merge
|
||||||
|
aggregates from hash tables including spilled ones. Usually, leaf processes are slower than intermediate nodes because they
|
||||||
|
need to read data from disk. As a result, less threads are used for intermediate nodes by default. You can change the
|
||||||
|
degree of intermediate nodes. See `intermediateCombineDegree` in [Advanced groupBy v2 configurations](#groupby-v2-configurations).
|
||||||
|
|
||||||
|
Please note that each Historical needs two merge buffers to process a groupBy v2 query with parallel combine: one for
|
||||||
|
computing intermediate aggregates from each segment and another for combining intermediate aggregates in parallel.
|
||||||
|
|
||||||
|
|
||||||
|
### Alternatives
|
||||||
|
|
||||||
|
There are some situations where other query types may be a better choice than groupBy.
|
||||||
|
|
||||||
|
- For queries with no "dimensions" (i.e. grouping by time only) the [Timeseries query](timeseriesquery.md) will
|
||||||
|
generally be faster than groupBy. The major differences are that it is implemented in a fully streaming manner (taking
|
||||||
|
advantage of the fact that segments are already sorted on time) and does not need to use a hash table for merging.
|
||||||
|
|
||||||
|
- For queries with a single "dimensions" element (i.e. grouping by one string dimension), the [TopN query](topnquery.md)
|
||||||
|
will sometimes be faster than groupBy. This is especially true if you are ordering by a metric and find approximate
|
||||||
|
results acceptable.
|
||||||
|
|
||||||
|
### Nested groupBys
|
||||||
|
|
||||||
|
Nested groupBys (dataSource of type "query") are performed differently for "v1" and "v2". The Broker first runs the
|
||||||
|
inner groupBy query in the usual way. "v1" strategy then materializes the inner query's results on-heap with Druid's
|
||||||
|
indexing mechanism, and runs the outer query on these materialized results. "v2" strategy runs the outer query on the
|
||||||
|
inner query's results stream with off-heap fact map and on-heap string dictionary that can spill to disk. Both
|
||||||
|
strategy perform the outer query on the Broker in a single-threaded fashion.
|
||||||
|
|
||||||
|
### Configurations
|
||||||
|
|
||||||
|
This section describes the configurations for groupBy queries. You can set the runtime properties in the `runtime.properties` file on Broker, Historical, and MiddleManager processes. You can set the query context parameters through the [query context](query-context.md).
|
||||||
|
|
||||||
|
#### Configurations for groupBy v2
|
||||||
|
|
||||||
|
Supported runtime properties:
|
||||||
|
|
||||||
|
|Property|Description|Default|
|
||||||
|
|--------|-----------|-------|
|
||||||
|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000|
|
||||||
|
|`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)|
|
||||||
|
|
||||||
|
Supported query contexts:
|
||||||
|
|
||||||
|
|Key|Description|
|
||||||
|
|---|-----------|
|
||||||
|
|`maxMergingDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|
|
||||||
|
|`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.|
|
||||||
|
|
||||||
|
|
||||||
|
### Advanced configurations
|
||||||
|
|
||||||
|
#### Common configurations for all groupBy strategies
|
||||||
|
|
||||||
|
Supported runtime properties:
|
||||||
|
|
||||||
|
|Property|Description|Default|
|
||||||
|
|--------|-----------|-------|
|
||||||
|
|`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v2|
|
||||||
|
|`druid.query.groupBy.singleThreaded`|Merge results using a single thread.|false|
|
||||||
|
|
||||||
|
Supported query contexts:
|
||||||
|
|
||||||
|
|Key|Description|
|
||||||
|
|---|-----------|
|
||||||
|
|`groupByStrategy`|Overrides the value of `druid.query.groupBy.defaultStrategy` for this query.|
|
||||||
|
|`groupByIsSingleThreaded`|Overrides the value of `druid.query.groupBy.singleThreaded` for this query.|
|
||||||
|
|
||||||
|
|
||||||
|
#### GroupBy v2 configurations
|
||||||
|
|
||||||
|
Supported runtime properties:
|
||||||
|
|
||||||
|
|Property|Description|Default|
|
||||||
|
|--------|-----------|-------|
|
||||||
|
|`druid.query.groupBy.bufferGrouperInitialBuckets`|Initial number of buckets in the off-heap hash table used for grouping results. Set to 0 to use a reasonable default (1024).|0|
|
||||||
|
|`druid.query.groupBy.bufferGrouperMaxLoadFactor`|Maximum load factor of the off-heap hash table used for grouping results. When the load factor exceeds this size, the table will be grown or spilled to disk. Set to 0 to use a reasonable default (0.7).|0|
|
||||||
|
|`druid.query.groupBy.forceHashAggregation`|Force to use hash-based aggregation.|false|
|
||||||
|
|`druid.query.groupBy.intermediateCombineDegree`|Number of intermediate nodes combined together in the combining tree. Higher degrees will need less threads which might be helpful to improve the query performance by reducing the overhead of too many threads if the server has sufficiently powerful cpu cores.|8|
|
||||||
|
|`druid.query.groupBy.numParallelCombineThreads`|Hint for the number of parallel combining threads. This should be larger than 1 to turn on the parallel combining feature. The actual number of threads used for parallel combining is min(`druid.query.groupBy.numParallelCombineThreads`, `druid.processing.numThreads`).|1 (disabled)|
|
||||||
|
|`druid.query.groupBy.applyLimitPushDownToSegment`|If Broker pushes limit down to queryable data server (historicals, peons) then limit results during segment scan. If typically there are a large number of segments taking part in a query on a data server, this setting may counterintuitively reduce performance if enabled.|false (disabled)|
|
||||||
|
|
||||||
|
Supported query contexts:
|
||||||
|
|
||||||
|
|Key|Description|Default|
|
||||||
|
|---|-----------|-------|
|
||||||
|
|`bufferGrouperInitialBuckets`|Overrides the value of `druid.query.groupBy.bufferGrouperInitialBuckets` for this query.|None|
|
||||||
|
|`bufferGrouperMaxLoadFactor`|Overrides the value of `druid.query.groupBy.bufferGrouperMaxLoadFactor` for this query.|None|
|
||||||
|
|`forceHashAggregation`|Overrides the value of `druid.query.groupBy.forceHashAggregation`|None|
|
||||||
|
|`intermediateCombineDegree`|Overrides the value of `druid.query.groupBy.intermediateCombineDegree`|None|
|
||||||
|
|`numParallelCombineThreads`|Overrides the value of `druid.query.groupBy.numParallelCombineThreads`|None|
|
||||||
|
|`sortByDimsFirst`|Sort the results first by dimension values and then by timestamp.|false|
|
||||||
|
|`forceLimitPushDown`|When all fields in the orderby are part of the grouping key, the Broker will push limit application down to the Historical processes. When the sorting order uses fields that are not in the grouping key, applying this optimization can result in approximate results with unknown accuracy, so this optimization is disabled by default in that case. Enabling this context flag turns on limit push down for limit/orderbys that contain non-grouping key columns.|false|
|
||||||
|
|`applyLimitPushDownToSegment`|If Broker pushes limit down to queryable nodes (historicals, peons) then limit results during segment scan. This context value can be used to override `druid.query.groupBy.applyLimitPushDownToSegment`.|true|
|
||||||
|
|
||||||
|
|
||||||
|
#### GroupBy v1 configurations
|
||||||
|
|
||||||
|
Supported runtime properties:
|
||||||
|
|
||||||
|
|Property|Description|Default|
|
||||||
|
|--------|-----------|-------|
|
||||||
|
|`druid.query.groupBy.maxIntermediateRows`|Maximum number of intermediate rows for the per-segment grouping engine. This is a tuning parameter that does not impose a hard limit; rather, it potentially shifts merging work from the per-segment engine to the overall merging index. Queries that exceed this limit will not fail.|50000|
|
||||||
|
|`druid.query.groupBy.maxResults`|Maximum number of results. Queries that exceed this limit will fail.|500000|
|
||||||
|
|
||||||
|
Supported query contexts:
|
||||||
|
|
||||||
|
|Key|Description|Default|
|
||||||
|
|---|-----------|-------|
|
||||||
|
|`maxIntermediateRows`|Can be used to lower the value of `druid.query.groupBy.maxIntermediateRows` for this query.|None|
|
||||||
|
|`maxResults`|Can be used to lower the value of `druid.query.groupBy.maxResults` for this query.|None|
|
||||||
|
|`useOffheap`|Set to true to store aggregations off-heap when merging results.|false|
|
||||||
|
|
||||||
|
#### Array based result rows
|
||||||
|
|
||||||
|
Internally Druid always uses an array based representation of groupBy result rows, but by default this is translated
|
||||||
|
into a map based result format at the Broker. To reduce the overhead of this translation, results may also be returned
|
||||||
|
from the Broker directly in the array based format if `resultAsArray` is set to `true` on the query context.
|
||||||
|
|
||||||
|
Each row is positional, and has the following fields, in order:
|
||||||
|
|
||||||
|
* Timestamp (optional; only if granularity != ALL)
|
||||||
|
* Dimensions (in order)
|
||||||
|
* Aggregators (in order)
|
||||||
|
* Post-aggregators (optional; in order, if present)
|
||||||
|
|
||||||
|
This schema is not available on the response, so it must be computed from the issued query in order to properly read
|
||||||
|
the results.
|
|
@ -0,0 +1,118 @@
|
||||||
|
---
|
||||||
|
id: query-execution
|
||||||
|
title: "Query execution"
|
||||||
|
---
|
||||||
|
|
||||||
|
<!--
|
||||||
|
~ Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
~ or more contributor license agreements. See the NOTICE file
|
||||||
|
~ distributed with this work for additional information
|
||||||
|
~ regarding copyright ownership. The ASF licenses this file
|
||||||
|
~ to you under the Apache License, Version 2.0 (the
|
||||||
|
~ "License"); you may not use this file except in compliance
|
||||||
|
~ with the License. You may obtain a copy of the License at
|
||||||
|
~
|
||||||
|
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
~
|
||||||
|
~ Unless required by applicable law or agreed to in writing,
|
||||||
|
~ software distributed under the License is distributed on an
|
||||||
|
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
~ KIND, either express or implied. See the License for the
|
||||||
|
~ specific language governing permissions and limitations
|
||||||
|
~ under the License.
|
||||||
|
-->
|
||||||
|
|
||||||
|
> This document describes how Druid executes [native queries](querying.md), but since [Druid SQL](sql.md) queries
|
||||||
|
> are translated to native queries, this document applies to the SQL runtime as well. Refer to the SQL
|
||||||
|
> [Query translation](sql.md#query-translation) page for information about how SQL queries are translated to native
|
||||||
|
> queries.
|
||||||
|
|
||||||
|
Druid's approach to query execution varies depending on the kind of [datasource](datasource.md) you are querying.
|
||||||
|
|
||||||
|
## Datasource type
|
||||||
|
|
||||||
|
### `table`
|
||||||
|
|
||||||
|
Queries that operate directly on [table datasources](datasource.md#table) are executed using a scatter-gather approach
|
||||||
|
led by the Broker process. The process looks like this:
|
||||||
|
|
||||||
|
1. The Broker identifies which [segments](../design/segments.md) are relevant to the query based on the `"intervals"`
|
||||||
|
parameter. Segments are always partitioned by time, so any segment whose interval overlaps the query interval is
|
||||||
|
potentially relevant.
|
||||||
|
|
||||||
|
2. The Broker may additionally further prune the segment list based on the `"filter"`, if the input data was partitioned
|
||||||
|
by range using the [`single_dim` partitionsSpec](../ingestion/native-batch.md#partitionsspec), and if the filter matches
|
||||||
|
the dimension used for partitioning.
|
||||||
|
|
||||||
|
3. The Broker, having pruned the list of segments for the query, forwards the query to data servers (like Historicals
|
||||||
|
and tasks running on MiddleManagers) that are currently serving those segments.
|
||||||
|
|
||||||
|
4. For all query types except [Scan](scan-query.md), data servers process each segment in parallel and generate partial
|
||||||
|
results for each segment. The specific processing that is done depends on the query type. These partial results may be
|
||||||
|
cached if [query caching](caching.md) is enabled. For Scan queries, segments are processed in order by a single thread,
|
||||||
|
and results are not cached.
|
||||||
|
|
||||||
|
5. The Broker receives partial results from each data server, merges them into the final result set, and returns them
|
||||||
|
to the caller. For Timeseries and Scan queries, and for GroupBy queries where there is no sorting, the Broker is able to
|
||||||
|
do this in a streaming fashion. Otherwise, the Broker fully computes the result set before returning anything.
|
||||||
|
|
||||||
|
### `lookup`
|
||||||
|
|
||||||
|
Queries that operate directly on [lookup datasources](datasource.md#lookup) (without a join) are executed on the Broker
|
||||||
|
that received the query, using its local copy of the lookup. All registered lookup tables are preloaded in-memory on the
|
||||||
|
Broker. The query runs single-threaded.
|
||||||
|
|
||||||
|
Execution of queries that use lookups as right-hand inputs to a join are executed in a way that depends on their
|
||||||
|
"base" (bottom-leftmost) datasource, as described in the [join](#join) section below.
|
||||||
|
|
||||||
|
### `union`
|
||||||
|
|
||||||
|
Queries that operate directly on [union datasources](datasource.md#union) are split up on the Broker into a separate
|
||||||
|
query for each table that is part of the union. Each of these queries runs separately, and the Broker merges their
|
||||||
|
results together.
|
||||||
|
|
||||||
|
### `inline`
|
||||||
|
|
||||||
|
Queries that operate directly on [inline datasources](datasource.md#inline) are executed on the Broker that received the
|
||||||
|
query. The query runs single-threaded.
|
||||||
|
|
||||||
|
Execution of queries that use inline datasources as right-hand inputs to a join are executed in a way that depends on
|
||||||
|
their "base" (bottom-leftmost) datasource, as described in the [join](#join) section below.
|
||||||
|
|
||||||
|
### `query`
|
||||||
|
|
||||||
|
[Query datasources](datasource.md#query) are subqueries. Each subquery is executed as if it was its own query and
|
||||||
|
the results are brought back to the Broker. Then, the Broker continues on with the rest of the query as if the subquery
|
||||||
|
was replaced with an inline datasource.
|
||||||
|
|
||||||
|
In most cases, subquery results are fully buffered in memory on the Broker before the rest of the query proceeds,
|
||||||
|
meaning subqueries execute sequentially. The total number of rows buffered across all subqueries of a given query
|
||||||
|
in this way cannot exceed the [`druid.server.http.maxSubqueryRows` property](../configuration/index.md).
|
||||||
|
|
||||||
|
There is one exception: if the outer query and all subqueries are the [groupBy](groupbyquery.md) type, then subquery
|
||||||
|
results can be processed in a streaming fashion and the `druid.server.http.maxSubqueryRows` limit does not apply.
|
||||||
|
|
||||||
|
### `join`
|
||||||
|
|
||||||
|
[Join datasources](datasource.md#join) are handled using a broadcast hash-join approach.
|
||||||
|
|
||||||
|
1. The Broker executes any subqueries that are inputs the join, as described in the [query](#query) section, and
|
||||||
|
replaces them with inline datasources.
|
||||||
|
|
||||||
|
2. The Broker flattens a join tree, if present, into a "base" datasource (the bottom-leftmost one) and other leaf
|
||||||
|
datasources (the rest).
|
||||||
|
|
||||||
|
3. Query execution proceeds using the same structure that the base datasource would use on its own. If the base
|
||||||
|
datasource is a [table](#table), segments are pruned based on `"intervals"` as usual, and the query is executed on the
|
||||||
|
cluster by forwarding it to all relevant data servers in parallel. If the base datasource is a [lookup](#lookup) or
|
||||||
|
[inline](#inline) datasource (including an inline datasource that was the result of inlining a subquery), the query is
|
||||||
|
executed on the Broker itself. The base query cannot be a union, because unions are not currently supported as inputs to
|
||||||
|
a join.
|
||||||
|
|
||||||
|
4. Before beginning to process the base datasource, the server(s) that will execute the query first inspect all the
|
||||||
|
non-base leaf datasources to determine if a new hash table needs to be built for the upcoming hash join. Currently,
|
||||||
|
lookups do not require new hash tables to be built (because they are preloaded), but inline datasources do.
|
||||||
|
|
||||||
|
5. Query execution proceeds again using the same structure that the base datasource would use on its own, with one
|
||||||
|
addition: while processing the base datasource, Druid servers will use the hash tables built from the other join inputs
|
||||||
|
to produce the join result row-by-row, and query engines will operate on the joined rows rather than the base rows.
|
|
@ -0,0 +1,125 @@
|
||||||
|
# 原生查询
|
||||||
|
|
||||||
|
> Apache Druid supports two query languages: [Druid SQL](../querying/sql.md) and [native queries](../querying/querying.md).
|
||||||
|
> This document describes the
|
||||||
|
> native query language. For information about how Druid SQL chooses which native query types to use when
|
||||||
|
> it runs a SQL query, refer to the [SQL documentation](../querying/sql.md#query-types).
|
||||||
|
|
||||||
|
Native queries in Druid are JSON objects and are typically issued to the Broker or Router processes. Queries can be
|
||||||
|
posted like this:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -X POST '<queryable_host>:<port>/druid/v2/?pretty' -H 'Content-Type:application/json' -H 'Accept:application/json' -d @<query_json_file>
|
||||||
|
```
|
||||||
|
|
||||||
|
> Replace `<queryable_host>:<port>` with the appropriate address and port for your system. For example, if running the quickstart configuration, replace `<queryable_host>:<port>` with localhost:8888.
|
||||||
|
|
||||||
|
You can also enter them directly in the Druid console's Query view. Simply pasting a native query into the console switches the editor into JSON mode.
|
||||||
|
|
||||||
|
![Native query](../assets/native-queries-01.png "Native query")
|
||||||
|
|
||||||
|
Druid's native query language is JSON over HTTP, although many members of the community have contributed different
|
||||||
|
[client libraries](https://druid.apache.org/libraries.html) in other languages to query Druid.
|
||||||
|
|
||||||
|
The Content-Type/Accept Headers can also take 'application/x-jackson-smile'.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -X POST '<queryable_host>:<port>/druid/v2/?pretty' -H 'Content-Type:application/json' -H 'Accept:application/x-jackson-smile' -d @<query_json_file>
|
||||||
|
```
|
||||||
|
|
||||||
|
> If the Accept header is not provided, it defaults to the value of 'Content-Type' header.
|
||||||
|
|
||||||
|
Druid's native query is relatively low level, mapping closely to how computations are performed internally. Druid queries
|
||||||
|
are designed to be lightweight and complete very quickly. This means that for more complex analysis, or to build
|
||||||
|
more complex visualizations, multiple Druid queries may be required.
|
||||||
|
|
||||||
|
Even though queries are typically made to Brokers or Routers, they can also be accepted by
|
||||||
|
[Historical](../design/historical.md) processes and by [Peons (task JVMs)](../design/peons.md)) that are running
|
||||||
|
stream ingestion tasks. This may be valuable if you want to query results for specific segments that are served by
|
||||||
|
specific processes.
|
||||||
|
|
||||||
|
## Available queries
|
||||||
|
|
||||||
|
Druid has numerous query types for various use cases. Queries are composed of various JSON properties and Druid has different types of queries for different use cases. The documentation for the various query types describe all the JSON properties that can be set.
|
||||||
|
|
||||||
|
### Aggregation queries
|
||||||
|
|
||||||
|
* [Timeseries](../querying/timeseriesquery.md)
|
||||||
|
* [TopN](../querying/topnquery.md)
|
||||||
|
* [GroupBy](../querying/groupbyquery.md)
|
||||||
|
|
||||||
|
### Metadata queries
|
||||||
|
|
||||||
|
* [TimeBoundary](../querying/timeboundaryquery.md)
|
||||||
|
* [SegmentMetadata](../querying/segmentmetadataquery.md)
|
||||||
|
* [DatasourceMetadata](../querying/datasourcemetadataquery.md)
|
||||||
|
|
||||||
|
### Other queries
|
||||||
|
|
||||||
|
* [Scan](../querying/scan-query.md)
|
||||||
|
* [Search](../querying/searchquery.md)
|
||||||
|
|
||||||
|
## Which query type should I use?
|
||||||
|
|
||||||
|
For aggregation queries, if more than one would satisfy your needs, we generally recommend using Timeseries or TopN
|
||||||
|
whenever possible, as they are specifically optimized for their use cases. If neither is a good fit, you should use
|
||||||
|
the GroupBy query, which is the most flexible.
|
||||||
|
|
||||||
|
## Query cancellation
|
||||||
|
|
||||||
|
Queries can be cancelled explicitly using their unique identifier. If the
|
||||||
|
query identifier is set at the time of query, or is otherwise known, the following
|
||||||
|
endpoint can be used on the Broker or Router to cancel the query.
|
||||||
|
|
||||||
|
```sh
|
||||||
|
DELETE /druid/v2/{queryId}
|
||||||
|
```
|
||||||
|
|
||||||
|
For example, if the query ID is `abc123`, the query can be cancelled as follows:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
curl -X DELETE "http://host:port/druid/v2/abc123"
|
||||||
|
```
|
||||||
|
|
||||||
|
## Query errors
|
||||||
|
|
||||||
|
### Authentication and authorization failures
|
||||||
|
|
||||||
|
For [secured](../design/auth.md) Druid clusters, query requests respond with an HTTP 401 response code in case of an authentication failure. For authorization failures, an HTTP 403 response code is returned.
|
||||||
|
|
||||||
|
### Query execution failures
|
||||||
|
|
||||||
|
If a query fails, Druid returns a response with an HTTP response code and a JSON object with the following structure:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"error" : "Query timeout",
|
||||||
|
"errorMessage" : "Timeout waiting for task.",
|
||||||
|
"errorClass" : "java.util.concurrent.TimeoutException",
|
||||||
|
"host" : "druid1.example.com:8083"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
The fields in the response are:
|
||||||
|
|
||||||
|
|field|description|
|
||||||
|
|-----|-----------|
|
||||||
|
|error|A well-defined error code (see below).|
|
||||||
|
|errorMessage|A free-form message with more information about the error. May be null.|
|
||||||
|
|errorClass|The class of the exception that caused this error. May be null.|
|
||||||
|
|host|The host on which this error occurred. May be null.|
|
||||||
|
|
||||||
|
Possible Druid error codes for the `error` field include:
|
||||||
|
|
||||||
|
|Error code|HTTP response code|description|
|
||||||
|
|----|-----------|-----------|
|
||||||
|
|`SQL parse failed`|400|Only for SQL queries. The SQL query failed to parse.|
|
||||||
|
|`Plan validation failed`|400|Only for SQL queries. The SQL query failed to validate.|
|
||||||
|
|`Resource limit exceeded`|400|The query exceeded a configured resource limit (e.g. groupBy maxResults).|
|
||||||
|
|`Query capacity exceeded`|429|The query failed to execute because of the lack of resources available at the time when the query was submitted. The resources could be any runtime resources such as [query scheduler lane capacity](../configuration/index.md#query-prioritization-and-laning), merge buffers, and so on. The error message should have more details about the failure.|
|
||||||
|
|`Unsupported operation`|501|The query attempted to perform an unsupported operation. This may occur when using undocumented features or when using an incompletely implemented extension.|
|
||||||
|
|`Query timeout`|504|The query timed out.|
|
||||||
|
|`Query interrupted`|500|The query was interrupted, possibly due to JVM shutdown.|
|
||||||
|
|`Query cancelled`|500|The query was cancelled through the query cancellation API.|
|
||||||
|
|`Truncated response context`|500|An intermediate response context for the query exceeded the built-in limit of 7KiB.<br/><br/>The response context is an internal data structure that Druid servers use to share out-of-band information when sending query results to each other. It is serialized in an HTTP header with a maximum length of 7KiB. This error occurs when an intermediate response context sent from a data server (like a Historical) to the Broker exceeds this limit.<br/><br/>The response context is used for a variety of purposes, but the one most likely to generate a large context is sharing details about segments that move during a query. That means this error can potentially indicate that a very large number of segments moved in between the time a Broker issued a query and the time it was processed on Historicals. This should rarely, if ever, occur during normal operation.|
|
||||||
|
|`Unknown exception`|500|Some other exception occurred. Check errorMessage and errorClass for details, although keep in mind that the contents of those fields are free-form and may change from release to release.|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,261 @@
|
||||||
|
---
|
||||||
|
id: topnquery
|
||||||
|
title: "TopN queries"
|
||||||
|
sidebar_label: "TopN"
|
||||||
|
---
|
||||||
|
|
||||||
|
<!--
|
||||||
|
~ Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
~ or more contributor license agreements. See the NOTICE file
|
||||||
|
~ distributed with this work for additional information
|
||||||
|
~ regarding copyright ownership. The ASF licenses this file
|
||||||
|
~ to you under the Apache License, Version 2.0 (the
|
||||||
|
~ "License"); you may not use this file except in compliance
|
||||||
|
~ with the License. You may obtain a copy of the License at
|
||||||
|
~
|
||||||
|
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
~
|
||||||
|
~ Unless required by applicable law or agreed to in writing,
|
||||||
|
~ software distributed under the License is distributed on an
|
||||||
|
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
~ KIND, either express or implied. See the License for the
|
||||||
|
~ specific language governing permissions and limitations
|
||||||
|
~ under the License.
|
||||||
|
-->
|
||||||
|
|
||||||
|
> Apache Druid supports two query languages: [Druid SQL](sql.md) and [native queries](querying.md).
|
||||||
|
> This document describes a query
|
||||||
|
> type in the native language. For information about when Druid SQL will use this query type, refer to the
|
||||||
|
> [SQL documentation](sql.md#query-types).
|
||||||
|
|
||||||
|
Apache Druid TopN queries return a sorted set of results for the values in a given dimension according to some criteria. Conceptually, they can be thought of as an approximate [GroupByQuery](../querying/groupbyquery.md) over a single dimension with an [Ordering](../querying/limitspec.md) spec. TopNs are much faster and resource efficient than GroupBys for this use case. These types of queries take a topN query object and return an array of JSON objects where each object represents a value asked for by the topN query.
|
||||||
|
|
||||||
|
TopNs are approximate in that each data process will rank their top K results and only return those top K results to the Broker. K, by default in Druid, is `max(1000, threshold)`. In practice, this means that if you ask for the top 1000 items ordered, the correctness of the first ~900 items will be 100%, and the ordering of the results after that is not guaranteed. TopNs can be made more accurate by increasing the threshold.
|
||||||
|
|
||||||
|
A topN query object looks like:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"queryType": "topN",
|
||||||
|
"dataSource": "sample_data",
|
||||||
|
"dimension": "sample_dim",
|
||||||
|
"threshold": 5,
|
||||||
|
"metric": "count",
|
||||||
|
"granularity": "all",
|
||||||
|
"filter": {
|
||||||
|
"type": "and",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"type": "selector",
|
||||||
|
"dimension": "dim1",
|
||||||
|
"value": "some_value"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "selector",
|
||||||
|
"dimension": "dim2",
|
||||||
|
"value": "some_other_val"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"aggregations": [
|
||||||
|
{
|
||||||
|
"type": "longSum",
|
||||||
|
"name": "count",
|
||||||
|
"fieldName": "count"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "doubleSum",
|
||||||
|
"name": "some_metric",
|
||||||
|
"fieldName": "some_metric"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"postAggregations": [
|
||||||
|
{
|
||||||
|
"type": "arithmetic",
|
||||||
|
"name": "average",
|
||||||
|
"fn": "/",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"type": "fieldAccess",
|
||||||
|
"name": "some_metric",
|
||||||
|
"fieldName": "some_metric"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "fieldAccess",
|
||||||
|
"name": "count",
|
||||||
|
"fieldName": "count"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"intervals": [
|
||||||
|
"2013-08-31T00:00:00.000/2013-09-03T00:00:00.000"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
There are 11 parts to a topN query.
|
||||||
|
|
||||||
|
|property|description|required?|
|
||||||
|
|--------|-----------|---------|
|
||||||
|
|queryType|This String should always be "topN"; this is the first thing Druid looks at to figure out how to interpret the query|yes|
|
||||||
|
|dataSource|A String or Object defining the data source to query, very similar to a table in a relational database. See [DataSource](../querying/datasource.md) for more information.|yes|
|
||||||
|
|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
|
||||||
|
|granularity|Defines the granularity to bucket query results. See [Granularities](../querying/granularities.md)|yes|
|
||||||
|
|filter|See [Filters](../querying/filters.md)|no|
|
||||||
|
|aggregations|See [Aggregations](../querying/aggregations.md)|for numeric metricSpec, aggregations or postAggregations should be specified. Otherwise no.|
|
||||||
|
|postAggregations|See [Post Aggregations](../querying/post-aggregations.md)|for numeric metricSpec, aggregations or postAggregations should be specified. Otherwise no.|
|
||||||
|
|dimension|A String or JSON object defining the dimension that you want the top taken for. For more info, see [DimensionSpecs](../querying/dimensionspecs.md)|yes|
|
||||||
|
|threshold|An integer defining the N in the topN (i.e. how many results you want in the top list)|yes|
|
||||||
|
|metric|A String or JSON object specifying the metric to sort by for the top list. For more info, see [TopNMetricSpec](../querying/topnmetricspec.md).|yes|
|
||||||
|
|context|See [Context](../querying/query-context.md)|no|
|
||||||
|
|
||||||
|
Please note the context JSON object is also available for topN queries and should be used with the same caution as the timeseries case.
|
||||||
|
The format of the results would look like so:
|
||||||
|
|
||||||
|
```json
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"timestamp": "2013-08-31T00:00:00.000Z",
|
||||||
|
"result": [
|
||||||
|
{
|
||||||
|
"dim1": "dim1_val",
|
||||||
|
"count": 111,
|
||||||
|
"some_metrics": 10669,
|
||||||
|
"average": 96.11711711711712
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"dim1": "another_dim1_val",
|
||||||
|
"count": 88,
|
||||||
|
"some_metrics": 28344,
|
||||||
|
"average": 322.09090909090907
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"dim1": "dim1_val3",
|
||||||
|
"count": 70,
|
||||||
|
"some_metrics": 871,
|
||||||
|
"average": 12.442857142857143
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"dim1": "dim1_val4",
|
||||||
|
"count": 62,
|
||||||
|
"some_metrics": 815,
|
||||||
|
"average": 13.14516129032258
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"dim1": "dim1_val5",
|
||||||
|
"count": 60,
|
||||||
|
"some_metrics": 2787,
|
||||||
|
"average": 46.45
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
|
## Behavior on multi-value dimensions
|
||||||
|
|
||||||
|
topN queries can group on multi-value dimensions. When grouping on a multi-value dimension, _all_ values
|
||||||
|
from matching rows will be used to generate one group per value. It's possible for a query to return more groups than
|
||||||
|
there are rows. For example, a topN on the dimension `tags` with filter `"t1" AND "t3"` would match only row1, and
|
||||||
|
generate a result with three groups: `t1`, `t2`, and `t3`. If you only need to include values that match
|
||||||
|
your filter, you can use a [filtered dimensionSpec](dimensionspecs.md#filtered-dimensionspecs). This can also
|
||||||
|
improve performance.
|
||||||
|
|
||||||
|
See [Multi-value dimensions](multi-value-dimensions.md) for more details.
|
||||||
|
|
||||||
|
## Aliasing
|
||||||
|
|
||||||
|
The current TopN algorithm is an approximate algorithm. The top 1000 local results from each segment are returned for merging to determine the global topN. As such, the topN algorithm is approximate in both rank and results. Approximate results *ONLY APPLY WHEN THERE ARE MORE THAN 1000 DIM VALUES*. A topN over a dimension with fewer than 1000 unique dimension values can be considered accurate in rank and accurate in aggregates.
|
||||||
|
|
||||||
|
The threshold can be modified from its default 1000 via the server parameter `druid.query.topN.minTopNThreshold`, which needs a restart of the servers to take effect, or via `minTopNThreshold` in the query context, which takes effect per query.
|
||||||
|
|
||||||
|
If you are wanting the top 100 of a high cardinality, uniformly distributed dimension ordered by some low-cardinality, uniformly distributed dimension, you are potentially going to get aggregates back that are missing data.
|
||||||
|
|
||||||
|
To put it another way, the best use cases for topN are when you can have confidence that the overall results are uniformly in the top. For example, if a particular site ID is in the top 10 for some metric for every hour of every day, then it will probably be accurate in the topN over multiple days. But if a site is barely in the top 1000 for any given hour, but over the whole query granularity is in the top 500 (example: a site which gets highly uniform traffic co-mingling in the dataset with sites with highly periodic data), then a top500 query may not have that particular site at the exact rank, and may not be accurate for that particular site's aggregates.
|
||||||
|
|
||||||
|
Before continuing in this section, please consider if you really need exact results. Getting exact results is a very resource intensive process. For the vast majority of "useful" data results, an approximate topN algorithm supplies plenty of accuracy.
|
||||||
|
|
||||||
|
Users wishing to get an *exact rank and exact aggregates* topN over a dimension with greater than 1000 unique values should issue a groupBy query and sort the results themselves. This is very computationally expensive for high-cardinality dimensions.
|
||||||
|
|
||||||
|
Users who can tolerate *approximate rank* topN over a dimension with greater than 1000 unique values, but require *exact aggregates* can issue two queries. One to get the approximate topN dimension values, and another topN with dimension selection filters which only use the topN results of the first.
|
||||||
|
|
||||||
|
### Example First query
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"aggregations": [
|
||||||
|
{
|
||||||
|
"fieldName": "L_QUANTITY_longSum",
|
||||||
|
"name": "L_QUANTITY_",
|
||||||
|
"type": "longSum"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"dataSource": "tpch_year",
|
||||||
|
"dimension":"l_orderkey",
|
||||||
|
"granularity": "all",
|
||||||
|
"intervals": [
|
||||||
|
"1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z"
|
||||||
|
],
|
||||||
|
"metric": "L_QUANTITY_",
|
||||||
|
"queryType": "topN",
|
||||||
|
"threshold": 2
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Example second query
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"aggregations": [
|
||||||
|
{
|
||||||
|
"fieldName": "L_TAX_doubleSum",
|
||||||
|
"name": "L_TAX_",
|
||||||
|
"type": "doubleSum"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldName": "L_DISCOUNT_doubleSum",
|
||||||
|
"name": "L_DISCOUNT_",
|
||||||
|
"type": "doubleSum"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldName": "L_EXTENDEDPRICE_doubleSum",
|
||||||
|
"name": "L_EXTENDEDPRICE_",
|
||||||
|
"type": "doubleSum"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"fieldName": "L_QUANTITY_longSum",
|
||||||
|
"name": "L_QUANTITY_",
|
||||||
|
"type": "longSum"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "count",
|
||||||
|
"type": "count"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"dataSource": "tpch_year",
|
||||||
|
"dimension":"l_orderkey",
|
||||||
|
"filter": {
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"dimension": "l_orderkey",
|
||||||
|
"type": "selector",
|
||||||
|
"value": "103136"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"dimension": "l_orderkey",
|
||||||
|
"type": "selector",
|
||||||
|
"value": "1648672"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"type": "or"
|
||||||
|
},
|
||||||
|
"granularity": "all",
|
||||||
|
"intervals": [
|
||||||
|
"1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z"
|
||||||
|
],
|
||||||
|
"metric": "L_QUANTITY_",
|
||||||
|
"queryType": "topN",
|
||||||
|
"threshold": 2
|
||||||
|
}
|
||||||
|
```
|
Loading…
Reference in New Issue