`APPROX_COUNT_DISTINCT_DS_HLL`, `APPROX_COUNT_DISTINCT_DS_THETA`, and `DS_QUANTILES_SKETCH` (but switch to
`APPROX_QUANTILE_DS` at query time). Do not use `AVG`; instead, use `SUM` and `COUNT` at ingest time and compute the
quotient at query time.
For an example, see [INSERT with rollup example](examples.md#insert-with-rollup).
## Multi-stage query tasks
### Execution flow
When you execute a SQL statement using the task endpoint [`/druid/v2/sql/task`](api.md#submit-a-query), the following
happens:
1. The Broker plans your SQL query into a native query, as usual.
2. The Broker wraps the native query into a task of type `query_controller`
and submits it to the indexing service.
3. The Broker returns the task ID to you and exits.
4. The controller task launches some number of worker tasks determined by
the `maxNumTasks` and `taskAssignment` [context parameters](./reference.md#context-parameters). You can set these settings individually for each query.
5. Worker tasks of type `query_worker` execute the query.
6. If the query is a SELECT query, the worker tasks send the results
back to the controller task, which writes them into its task report.
If the query is an INSERT or REPLACE query, the worker tasks generate and
publish new Druid segments to the provided datasource.
### Parallelism
The [`maxNumTasks`](./reference.md#context-parameters) query parameter determines the maximum number of tasks your
query will use, including the one `query_controller` task. Generally, queries perform better with more workers. The
lowest possible value of `maxNumTasks` is two (one worker and one controller). Do not set this higher than the number of
free slots available in your cluster; doing so will result in a [TaskStartTimeout](reference.md#error-codes) error.
When [reading external data](#extern), EXTERN can read multiple files in parallel across
different worker tasks. However, EXTERN does not split individual files across multiple worker tasks. If you have a
small number of very large input files, you can increase query parallelism by splitting up your input files.
The `druid.worker.capacity` server property on each [Middle Manager](../design/architecture.md#druid-services)
determines the maximum number of worker tasks that can run on each server at once. Worker tasks run single-threaded,
which also determines the maximum number of processors on the server that can contribute towards multi-stage queries.
### Memory usage
Increasing the amount of available memory can improve performance in certain cases:
- Segment generation becomes more efficient when data doesn't spill to disk as often.
- Sorting stage output data becomes more efficient since available memory affects the
number of required sorting passes.
Worker tasks use both JVM heap memory and off-heap ("direct") memory.
On Peons launched by Middle Managers, the bulk of the JVM heap (75%) is split up into two bundles of equal size: one
processor bundle and one worker bundle. Each one comprises 37.5% of the available JVM heap.
The processor memory bundle is used for query processing and segment generation. Each processor bundle must also
provides space to buffer I/O between stages. Specifically, each downstream stage requires 1 MB of buffer space for each
upstream worker. For example, if you have 100 workers running in stage 0, and stage 1 reads from stage 0, then each
worker in stage 1 requires 1M * 100 = 100 MB of memory for frame buffers.
The worker memory bundle is used for sorting stage output data prior to shuffle. Workers can sort more data than fits in
memory; in this case, they will switch to using disk.
Worker tasks also use off-heap ("direct") memory. Set the amount of direct memory available (`-XX:MaxDirectMemorySize`)
to at least `(druid.processing.numThreads + 1) * druid.processing.buffer.sizeBytes`. Increasing the amount of direct
memory available beyond the minimum does not speed up processing.
### Disk usage
Worker tasks use local disk for four purposes:
- Temporary copies of input data. Each temporary file is deleted before the next one is read. You only need
enough temporary disk space to store one input file at a time per task.
- Temporary data related to segment generation. You only need enough temporary disk space to store one segments' worth
of data at a time per task. This is generally less than 2 GB per task.
- External sort of data prior to shuffle. Requires enough space to store a compressed copy of the entire output dataset
for a task.
- Storing stage output data during a shuffle. Requires enough space to store a compressed copy of the entire output
dataset for a task.
Workers use the task working directory, given by
[`druid.indexer.task.baseDir`](../configuration/index.md#additional-peon-configuration), for these items. It is
important that this directory has enough space available for these purposes.