[Docs] Improvements to JSON-based batch Ingestion page (#15286)

This commit is contained in:
Katya Macedo 2023-10-31 16:50:45 -05:00 committed by GitHub
parent 87695410ac
commit a43ffbdf2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 159 additions and 130 deletions

View File

@ -27,7 +27,7 @@ sidebar_label: JSON-based batch
This page describes JSON-based batch ingestion using [ingestion specs](ingestion-spec.md). For SQL-based batch ingestion using the [`druid-multi-stage-query`](../multi-stage-query/index.md) extension, see [SQL-based ingestion](../multi-stage-query/index.md). Refer to the [ingestion methods](../ingestion/index.md#batch) table to determine which ingestion method is right for you.
:::
Apache Druid supports the following types of native batch indexing tasks:
Apache Druid supports the following types of JSON-based batch indexing tasks:
- Parallel task indexing (`index_parallel`) that can run multiple indexing tasks concurrently. Parallel task works well for production ingestion tasks.
- Simple task indexing (`index`) that run a single indexing task at a time. Simple task indexing is suitable for development and test environments.
@ -35,72 +35,92 @@ This topic covers the configuration for `index_parallel` ingestion specs.
For related information on batch indexing, see:
- [Batch ingestion method comparison table](./index.md#batch) for a comparison of batch ingestion methods.
- [Tutorial: Loading a file](../tutorials/tutorial-batch.md) for a tutorial on native batch ingestion.
- [Tutorial: Loading a file](../tutorials/tutorial-batch.md) for a tutorial on JSON-based batch ingestion.
- [Input sources](./input-sources.md) for possible input sources.
- [Source input formats](./data-formats.md#input-format) for possible input formats.
## Submit an indexing task
To run either kind of native batch indexing task you can:
To run either kind of JSON-based batch indexing task, you can:
- Use the **Load Data** UI in the web console to define and submit an ingestion spec.
- Define an ingestion spec in JSON based upon the [examples](#parallel-indexing-example) and reference topics for batch indexing. Then POST the ingestion spec to the [Tasks API endpoint](../api-reference/tasks-api.md),
`/druid/indexer/v1/task`, the Overlord service. Alternatively you can use the indexing script included with Druid at `bin/post-index-task`.
- Define an ingestion spec in JSON based upon the [examples](#parallel-indexing-example) and reference topics for batch indexing. Then POST the ingestion spec to the [Tasks API endpoint](../api-reference/tasks-api.md), `/druid/indexer/v1/task`, the Overlord service. Alternatively, you can use the indexing script included with Druid at `bin/post-index-task`.
## Parallel task indexing
The parallel task type `index_parallel` is a task for multi-threaded batch indexing. Parallel task indexing only relies on Druid resources. It does not depend on other external systems like Hadoop.
The parallel task type `index_parallel` is a task for multi-threaded batch indexing. Parallel task indexing only relies on Druid resources. It doesn't depend on other external systems like Hadoop.
The `index_parallel` task is a supervisor task that orchestrates
the whole indexing process. The supervisor task splits the input data and creates worker tasks to process the individual portions of data.
Druid issues the worker tasks to the Overlord. The overlord schedules and runs the workers on MiddleManagers or Indexers. After a worker task successfully processes the assigned input portion, it reports the resulting segment list to the supervisor task.
Druid issues the worker tasks to the Overlord. The Overlord schedules and runs the workers on MiddleManagers or Indexers. After a worker task successfully processes the assigned input portion, it reports the resulting segment list to the Supervisor task.
The supervisor task periodically checks the status of worker tasks. If a task fails, the supervisor retries the task until the number of retries reaches the configured limit. If all worker tasks succeed, it publishes the reported segments at once and finalizes ingestion.
The Supervisor task periodically checks the status of worker tasks. If a task fails, the Supervisor retries the task until the number of retries reaches the configured limit. If all worker tasks succeed, it publishes the reported segments at once and finalizes ingestion.
The detailed behavior of the parallel task is different depending on the `partitionsSpec`. See [`partitionsSpec`](#partitionsspec) for more details.
Parallel tasks require:
- a splittable [`inputSource`](#splittable-input-sources) in the `ioConfig`. For a list of supported splittable input formats, see [Splittable input sources](#splittable-input-sources).
- the `maxNumConcurrentSubTasks` greater than 1 in the `tuningConfig`. Otherwise tasks run sequentially. The `index_parallel` task reads each input file one by one and creates segments by itself.
Parallel tasks require the following:
- A splittable [`inputSource`](#splittable-input-sources) in the `ioConfig`. For a list of supported splittable input formats, see [Splittable input sources](#splittable-input-sources).
- The `maxNumConcurrentSubTasks` greater than 1 in the `tuningConfig`. Otherwise tasks run sequentially. The `index_parallel` task reads each input file one by one and creates segments by itself.
### Supported compression formats
Native batch ingestion supports the following compression formats:
JSON-based batch ingestion supports the following compression formats:
- `bz2`
- `gz`
- `xz`
- `zip`
- `sz` (Snappy)
- `zst` (ZSTD).
- `zst` (ZSTD)
### Implementation considerations
This section covers implementation details to consider when you implement parallel task ingestion.
#### Volume control for worker tasks
You can control the amount of input data each worker task processes using different configurations depending on the phase in parallel ingestion.
- See [`partitionsSpec`](#partitionsspec) for details about how partitioning affects data volume for tasks.
- For the tasks that read data from the `inputSource`, you can set the [Split hint spec](#split-hint-spec) in the `tuningConfig`.
- For the task that merge shuffled segments, you can set the `totalNumMergeTasks` in the `tuningConfig`.
See [`partitionsSpec`](#partitionsspec) for details about how partitioning affects data volume for tasks.
For the tasks that read data from the `inputSource`, you can set the [Split hint spec](#split-hint-spec) in the `tuningConfig`.
For the task that merge shuffled segments, you can set the `totalNumMergeTasks` in the `tuningConfig`.
#### Number of running tasks
The `maxNumConcurrentSubTasks` in the `tuningConfig` determines the number of concurrent worker tasks that run in parallel. The supervisor task checks the number of current running worker tasks and creates more if it's smaller than `maxNumConcurrentSubTasks` regardless of the number of available task slots. This may affect to other ingestion performance. See [Capacity planning](#capacity-planning) section for more details.
The `maxNumConcurrentSubTasks` in the `tuningConfig` determines the number of concurrent worker tasks that run in parallel. The Supervisor task checks the number of current running worker tasks and creates more if it's smaller than `maxNumConcurrentSubTasks` regardless of the number of available task slots. This may affect to other ingestion performance. See [Capacity planning](#capacity-planning) section for more details.
#### Replacing or appending data
By default, batch ingestion replaces all data in the intervals in your `granularitySpec` for any segment that it writes to. If you want to add to the segment instead, set the `appendToExisting` flag in the `ioConfig`. Batch ingestion only replaces data in segments where it actively adds data. If there are segments in the intervals for your `granularitySpec` that have do not have data from a task, they remain unchanged. If any existing segments partially overlap with the intervals in the `granularitySpec`, the portion of those segments outside the interval for the new spec remain visible.
By default, JSON-based batch ingestion replaces all data in the intervals in your `granularitySpec` for any segment that it writes to. If you want to add to the segment instead, set the `appendToExisting` flag in the `ioConfig`. JSON-based batch ingestion only replaces data in segments where it actively adds data. If there are segments in the intervals for your `granularitySpec` that don't have data from a task, they remain unchanged. If any existing segments partially overlap with the intervals in the `granularitySpec`, the portion of those segments outside the interval for the new spec remain visible.
#### Fully replacing existing segments using tombstones
You can set `dropExisting` flag in the `ioConfig` to true if you want the ingestion task to replace all existing segments that start and end within the intervals for your `granularitySpec`. This applies whether or not the new data covers all existing segments. `dropExisting` only applies when `appendToExisting` is false and the `granularitySpec` contains an `interval`. WARNING: this functionality is still in beta.
:::info
This feature is still experimental.
:::
You can set `dropExisting` flag in the `ioConfig` to true if you want the ingestion task to replace all existing segments that start and end within the intervals for your `granularitySpec`. This applies whether or not the new data covers all existing segments. `dropExisting` only applies when `appendToExisting` is false and the `granularitySpec` contains an `interval`.
The following examples demonstrate when to set the `dropExisting` property to true in the `ioConfig`:
Consider an existing segment with an interval of 2020-01-01 to 2021-01-01 and `YEAR` `segmentGranularity`. You want to overwrite the whole interval of 2020-01-01 to 2021-01-01 with new data using the finer segmentGranularity of `MONTH`. If the replacement data does not have a record within every months from 2020-01-01 to 2021-01-01 Druid cannot drop the original `YEAR` segment even if it does include all the replacement data. Set `dropExisting` to true in this case to replace the original segment at `YEAR` `segmentGranularity` since you no longer need it.<br /><br />
Imagine you want to re-ingest or overwrite a datasource and the new data does not contain some time intervals that exist in the datasource. For example, a datasource contains the following data at `MONTH` segmentGranularity:
Consider an existing segment with an interval of 2020-01-01 to 2021-01-01 and `YEAR` `segmentGranularity`. You want to overwrite the whole interval of 2020-01-01 to 2021-01-01 with new data using the finer segmentGranularity of `MONTH`. If the replacement data does not have a record within every months from 2020-01-01 to 2021-01-01 Druid cannot drop the original `YEAR` segment even if it does include all the replacement data. Set `dropExisting` to true in this case to replace the original segment at `YEAR` `segmentGranularity` since you no longer need it.
Imagine you want to re-ingest or overwrite a datasource and the new data does not contain some time intervals that exist in the datasource. For example, a datasource contains the following data at `MONTH` `segmentGranularity`:
- **January**: 1 record
- **February**: 10 records
- **March**: 10 records
You want to re-ingest and overwrite with new data as follows:
You want to re-ingest and overwrite with new data as follows:
- **January**: 0 records
- **February**: 10 records
- **March**: 9 records
Unless you set `dropExisting` to true, the result after ingestion with overwrite using the same MONTH segmentGranularity would be:
Unless you set `dropExisting` to true, the result after ingestion with overwrite using the same `MONTH` `segmentGranularity` would be:
* **January**: 1 record
* **February**: 10 records
* **March**: 9 records
@ -109,7 +129,10 @@ This may not be what it is expected since the new data has 0 records for January
## Parallel indexing example
The following example illustrates the configuration for a parallel indexing task:
The following example illustrates the configuration for a parallel indexing task.
<details>
<summary>Click to view the example</summary>
```json
{
@ -188,65 +211,70 @@ The following example illustrates the configuration for a parallel indexing task
}
}
```
</details>
## Parallel indexing configuration
The following table defines the primary sections of the input spec:
|property|description|required?|
|--------|-----------|---------|
|type|The task type. For parallel task indexing, set the value to `index_parallel`.|yes|
|id|The task ID. If omitted, Druid generates the task ID using the task type, data source name, interval, and date-time stamp. |no|
|spec|The ingestion spec that defines the [data schema](#dataschema), [IO config](#ioconfig), and [tuning config](#tuningconfig).|yes|
|context|Context to specify various task configuration parameters. See [Task context parameters](../ingestion/tasks.md#context-parameters) for more details.|no|
|Property|Description|Required|
|--------|-----------|--------|
|`type`|The task type. For parallel task indexing, set the value to `index_parallel`.|yes|
|`id`|The task ID. If omitted, Druid generates the task ID using the task type, data source name, interval, and date-time stamp.|no|
|`spec`|The ingestion spec that defines the [data schema](#dataschema), [IO config](#ioconfig), and [tuning config](#tuningconfig).|yes|
|`context`|Context to specify various task configuration parameters. See [Task context parameters](../ingestion/tasks.md#context-parameters) for more details.|no|
### `dataSchema`
This field is required. In general, it defines the way that Druid will store your data: the primary timestamp column, the dimensions, metrics, and any transformations. For an overview, see [Ingestion Spec DataSchema](../ingestion/ingestion-spec.md#dataschema).
This field is required. In general, it defines the way that Druid stores your data: the primary timestamp column, the dimensions, metrics, and any transformations. For an overview, see [Ingestion Spec DataSchema](../ingestion/ingestion-spec.md#dataschema).
When defining the `granularitySpec` for index parallel, consider the defining `intervals` explicitly if you know the time range of the data. This way locking failure happens faster and Druid won't accidentally replace data outside the interval range some rows contain unexpected timestamps. The reasoning is as follows:
- If you explicitly define `intervals`, batch ingestion locks all intervals specified when it starts up. Problems with locking become evident quickly when multiple ingestion or indexing tasks try to obtain a lock on the same interval. For example, if a Kafka ingestion task tries to obtain a lock on a locked interval causing the ingestion task fail. Furthermore, if there are rows outside the specified intervals, Druid drops them, avoiding conflict with unexpected intervals.
- If you do not define `intervals`, batch ingestion locks each interval when the interval is discovered. In this case if the task overlaps with a higher-priority task, issues with conflicting locks occur later in the ingestion process. Also if the source data includes rows with unexpected timestamps, they may caused unexpected locking of intervals.
- If you explicitly define `intervals`, JSON-based batch ingestion locks all intervals specified when it starts up. Problems with locking become evident quickly when multiple ingestion or indexing tasks try to obtain a lock on the same interval. For example, if a Kafka ingestion task tries to obtain a lock on a locked interval causing the ingestion task fail. Furthermore, if there are rows outside the specified intervals, Druid drops them, avoiding conflict with unexpected intervals.
- If you don't define `intervals`, JSON-based batch ingestion locks each interval when the interval is discovered. In this case, if the task overlaps with a higher-priority task, issues with conflicting locks occur later in the ingestion process. If the source data includes rows with unexpected timestamps, they may caused unexpected locking of intervals.
### `ioConfig`
|property|description|default|required?|
The following table lists the properties of a `ioConfig` object:
|Property|Description|Default|Required|
|--------|-----------|-------|---------|
|type|The task type. Set to the value to `index_parallel`.|none|yes|
|inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task replaces all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not change any existing segment. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not replace any segments even if `dropExisting` is `true`. WARNING: this functionality is still in beta.|false|no|
|`type`|The task type. Set to the value to `index_parallel`.|none|yes|
|`inputFormat`|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
|`appendToExisting`|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
|`dropExisting`|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task replaces all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid doesn't change any existing segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` isn't specified in `granularitySpec`, Druid doesn't replace any segments even if `dropExisting` is `true`. WARNING: this feature is still experimental.|false|no|
### `tuningConfig`
The tuningConfig is optional and default parameters will be used if no tuningConfig is specified. See below for more details.
The `tuningConfig` is optional. Druid uses default parameters if `tuningConfig` is not specified.
|property|description|default|required?|
The following table lists the properties of a `tuningConfig` object:
|Property|Description|Default|Required|
|--------|-----------|-------|---------|
|type|The task type. Set the value to`index_parallel`.|none|yes|
|maxRowsInMemory|Determines when Druid should perform intermediate persists to disk. Normally you do not need to set this. Depending on the nature of your data, if rows are short in terms of bytes. For example, you may not want to store a million rows in memory. In this case, set this value.|1000000|no|
|maxBytesInMemory|Use to determine when Druid should perform intermediate persists to disk. Normally Druid computes this internally and you do not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists). Note that `maxBytesInMemory` also includes heap usage of artifacts created from intermediary persists. This means that after every persist, the amount of `maxBytesInMemory` until next persist will decrease. Tasks fail when the sum of bytes of all intermediary persisted artifacts exceeds `maxBytesInMemory`.|1/6 of max JVM memory|no|
|maxColumnsToMerge|Limit of the number of segments to merge in a single phase when merging segments for publishing. This limit affects the total number of columns present in a set of segments to merge. If the limit is exceeded, segment merging occurs in multiple phases. Druid merges at least 2 segments per phase, regardless of this setting.|-1 (unlimited)|no|
|maxTotalRows|Deprecated. Use `partitionsSpec` instead. Total number of rows in segments waiting to be pushed. Used to determine when intermediate pushing should occur.|20000000|no|
|numShards|Deprecated. Use `partitionsSpec` instead. Directly specify the number of shards to create when using a `hashed` `partitionsSpec`. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|no|
|splitHintSpec|Hint to control the amount of data that each first phase task reads. Druid may ignore the hint depending on the implementation of the input source. See [Split hint spec](#split-hint-spec) for more details.|size-based split hint spec|no|
|partitionsSpec|Defines how to partition data in each timeChunk, see [PartitionsSpec](#partitionsspec)|`dynamic` if `forceGuaranteedRollup` = false, `hashed` or `single_dim` if `forceGuaranteedRollup` = true|no|
|indexSpec|Defines segment storage format options to be used at indexing time, see [IndexSpec](ingestion-spec.md#indexspec)|null|no|
|indexSpecForIntermediatePersists|Defines segment storage format options to use at indexing time for intermediate persisted temporary segments. You can use this configuration to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, if you disable compression on intermediate segments, page cache use my increase while intermediate segments are used before Druid merges them to the final published segment published. See [IndexSpec](./ingestion-spec.md#indexspec) for possible values.|same as indexSpec|no|
|maxPendingPersists|Maximum number of pending persists that remain not started. If a new intermediate persist exceeds this limit, ingestion blocks until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
|forceGuaranteedRollup|Forces [perfect rollup](rollup.md). The perfect rollup optimizes the total size of generated segments and querying time but increases indexing time. If true, specify `intervals` in the `granularitySpec` and use either `hashed` or `single_dim` for the `partitionsSpec`. You cannot use this flag in conjunction with `appendToExisting` of IOConfig. For more details, see [Segment pushing modes](#segment-pushing-modes).|false|no|
|reportParseExceptions|If true, Druid throws exceptions encountered during parsing and halts ingestion. If false, Druid skips unparseable rows and fields.|false|no|
|pushTimeout|Milliseconds to wait to push segments. Must be >= 0, where 0 means to wait forever.|0|no|
|segmentWriteOutMediumFactory|Segment write-out medium to use when creating segments. See [SegmentWriteOutMediumFactory](#segmentwriteoutmediumfactory).|If not specified, uses the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` |no|
|maxNumConcurrentSubTasks|Maximum number of worker tasks that can be run in parallel at the same time. The supervisor task spawns worker tasks up to `maxNumConcurrentSubTasks` regardless of the current available task slots. If this value is 1, the supervisor task processes data ingestion on its own instead of spawning worker tasks. If this value is set to too large, the supervisor may create too many worker tasks that block other ingestion tasks. See [Capacity planning](#capacity-planning) for more details.|1|no|
|maxRetry|Maximum number of retries on task failures.|3|no|
|maxNumSegmentsToMerge|Max limit for the number of segments that a single task can merge at the same time in the second phase. Used only when `forceGuaranteedRollup` is true.|100|no|
|totalNumMergeTasks|Total number of tasks that merge segments in the merge phase when `partitionsSpec` is set to `hashed` or `single_dim`.|10|no|
|taskStatusCheckPeriodMs|Polling period in milliseconds to check running task statuses.|1000|no|
|chatHandlerTimeout|Timeout for reporting the pushed segments in worker tasks.|PT10S|no|
|chatHandlerNumRetries|Retries for reporting the pushed segments in worker tasks.|5|no|
|awaitSegmentAvailabilityTimeoutMillis|Long|Milliseconds to wait for the newly indexed segments to become available for query after ingestion completes. If `<= 0`, no wait occurs. If `> 0`, the task waits for the Coordinator to indicate that the new segments are available for querying. If the timeout expires, the task exits as successful, but the segments are not confirmed as available for query.|no (default = 0)|
|`type`|The task type. Set the value to`index_parallel`.|none|yes|
|`maxRowsInMemory`|Determines when Druid should perform intermediate persists to disk. Normally you don't need to set this. Depending on the nature of your data, if rows are short in terms of bytes. For example, you may not want to store a million rows in memory. In this case, set this value.|1000000|no|
|`maxBytesInMemory`|Use to determine when Druid should perform intermediate persists to disk. Normally Druid computes this internally and you don't need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is `maxBytesInMemory * (2 + maxPendingPersists)`. Note that `maxBytesInMemory` also includes heap usage of artifacts created from intermediary persists. This means that after every persist, the amount of `maxBytesInMemory` until next persist will decrease. Tasks fail when the sum of bytes of all intermediary persisted artifacts exceeds `maxBytesInMemory`.|1/6 of max JVM memory|no|
|`maxColumnsToMerge`|Limit of the number of segments to merge in a single phase when merging segments for publishing. This limit affects the total number of columns present in a set of segments to merge. If the limit is exceeded, segment merging occurs in multiple phases. Druid merges at least 2 segments per phase, regardless of this setting.|-1 (unlimited)|no|
|`maxTotalRows`|Deprecated. Use `partitionsSpec` instead. Total number of rows in segments waiting to be pushed. Used to determine when intermediate pushing should occur.|20000000|no|
|`numShards`|Deprecated. Use `partitionsSpec` instead. Directly specify the number of shards to create when using a `hashed` `partitionsSpec`. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data.|null|no|
|`splitHintSpec`|Hint to control the amount of data that each first phase task reads. Druid may ignore the hint depending on the implementation of the input source. See [Split hint spec](#split-hint-spec) for more details.|size-based split hint spec|no|
|`partitionsSpec`|Defines how to partition data in each timeChunk, see [PartitionsSpec](#partitionsspec).|`dynamic` if `forceGuaranteedRollup` = false, `hashed` or `single_dim` if `forceGuaranteedRollup` = true|no|
|`indexSpec`|Defines segment storage format options to be used at indexing time, see [IndexSpec](ingestion-spec.md#indexspec).|null|no|
|`indexSpecForIntermediatePersists`|Defines segment storage format options to use at indexing time for intermediate persisted temporary segments. You can use this configuration to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, if you disable compression on intermediate segments, page cache use my increase while intermediate segments are used before Druid merges them to the final published segment published. See [IndexSpec](./ingestion-spec.md#indexspec) for possible values.|same as `indexSpec`|no|
|`maxPendingPersists`|Maximum number of pending persists that remain not started. If a new intermediate persist exceeds this limit, ingestion blocks until the currently-running persist finishes. Maximum heap memory usage for indexing scales with `maxRowsInMemory * (2 + maxPendingPersists)`.|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
|`forceGuaranteedRollup`|Forces [perfect rollup](rollup.md). The perfect rollup optimizes the total size of generated segments and querying time but increases indexing time. If true, specify `intervals` in the `granularitySpec` and use either `hashed` or `single_dim` for the `partitionsSpec`. You cannot use this flag in conjunction with `appendToExisting` of `IOConfig`. For more details, see [Segment pushing modes](#segment-pushing-modes).|false|no|
|`reportParseExceptions`|If true, Druid throws exceptions encountered during parsing and halts ingestion. If false, Druid skips unparseable rows and fields.|false|no|
|`pushTimeout`|Milliseconds to wait to push segments. Must be >= 0, where 0 means to wait forever.|0|no|
|`segmentWriteOutMediumFactory`|Segment write-out medium to use when creating segments. See [SegmentWriteOutMediumFactory](#segmentwriteoutmediumfactory).|If not specified, uses the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` |no|
|`maxNumConcurrentSubTasks`|Maximum number of worker tasks that can be run in parallel at the same time. The supervisor task spawns worker tasks up to `maxNumConcurrentSubTasks` regardless of the current available task slots. If this value is 1, the supervisor task processes data ingestion on its own instead of spawning worker tasks. If this value is set to too large, the supervisor may create too many worker tasks that block other ingestion tasks. See [Capacity planning](#capacity-planning) for more details.|1|no|
|`maxRetry`|Maximum number of retries on task failures.|3|no|
|`maxNumSegmentsToMerge`|Max limit for the number of segments that a single task can merge at the same time in the second phase. Used only when `forceGuaranteedRollup` is true.|100|no|
|`totalNumMergeTasks`|Total number of tasks that merge segments in the merge phase when `partitionsSpec` is set to `hashed` or `single_dim`.|10|no|
|`taskStatusCheckPeriodMs`|Polling period in milliseconds to check running task statuses.|1000|no|
|`chatHandlerTimeout`|Timeout for reporting the pushed segments in worker tasks.|PT10S|no|
|`chatHandlerNumRetries`|Retries for reporting the pushed segments in worker tasks.|5|no|
|`awaitSegmentAvailabilityTimeoutMillis`|Milliseconds to wait for the newly indexed segments to become available for query after ingestion completes. If `<= 0`, no wait occurs. If `> 0`, the task waits for the Coordinator to indicate that the new segments are available for querying. If the timeout expires, the task exits as successful, but the segments are not confirmed as available for query.|Long|no (default = 0)|
### Split Hint Spec
@ -256,25 +284,28 @@ The split hint spec is used to help the supervisor task divide input sources. Ea
The size-based split hint spec affects all splittable input sources except for the HTTP input source and SQL input source.
|property|description|default|required?|
|Property|Description|Default|Required|
|--------|-----------|-------|---------|
|type|Set the value to `maxSize`.|none|yes|
|maxSplitSize|Maximum number of bytes of input files to process in a single subtask. If a single file is larger than the limit, Druid processes the file alone in a single subtask. Druid does not split files across tasks. One subtask will not process more files than `maxNumFiles` even when their total size is smaller than `maxSplitSize`. [Human-readable format](../configuration/human-readable-byte.md) is supported.|1GiB|no|
|maxNumFiles|Maximum number of input files to process in a single subtask. This limit avoids task failures when the ingestion spec is too long. There are two known limits on the max size of serialized ingestion spec: the max ZNode size in ZooKeeper (`jute.maxbuffer`) and the max packet size in MySQL (`max_allowed_packet`). These limits can cause ingestion tasks fail if the serialized ingestion spec size hits one of them. One subtask will not process more data than `maxSplitSize` even when the total number of files is smaller than `maxNumFiles`.|1000|no|
|`type`|Set the value to `maxSize`.|none|yes|
|`maxSplitSize`|Maximum number of bytes of input files to process in a single subtask. If a single file is larger than the limit, Druid processes the file alone in a single subtask. Druid does not split files across tasks. One subtask will not process more files than `maxNumFiles` even when their total size is smaller than `maxSplitSize`. [Human-readable format](../configuration/human-readable-byte.md) is supported.|1GiB|no|
|`maxNumFiles`|Maximum number of input files to process in a single subtask. This limit avoids task failures when the ingestion spec is too long. There are two known limits on the max size of serialized ingestion spec: the max ZNode size in ZooKeeper (`jute.maxbuffer`) and the max packet size in MySQL (`max_allowed_packet`). These limits can cause ingestion tasks fail if the serialized ingestion spec size hits one of them. One subtask will not process more data than `maxSplitSize` even when the total number of files is smaller than `maxNumFiles`.|1000|no|
#### Segments Split Hint Spec
The segments split hint spec is used only for [`DruidInputSource`](./input-sources.md).
|property|description|default|required?|
|Property|Description|Default|Required|
|--------|-----------|-------|---------|
|type|Set the value to `segments`.|none|yes|
|maxInputSegmentBytesPerTask|Maximum number of bytes of input segments to process in a single subtask. If a single segment is larger than this number, Druid processes the segment alone in a single subtask. Druid never splits input segments across tasks. A single subtask will not process more segments than `maxNumSegments` even when their total size is smaller than `maxInputSegmentBytesPerTask`. [Human-readable format](../configuration/human-readable-byte.md) is supported.|1GiB|no|
|maxNumSegments|Maximum number of input segments to process in a single subtask. This limit avoids failures due to the the ingestion spec being too long. There are two known limits on the max size of serialized ingestion spec: the max ZNode size in ZooKeeper (`jute.maxbuffer`) and the max packet size in MySQL (`max_allowed_packet`). These limits can make ingestion tasks fail when the serialized ingestion spec size hits one of them. A single subtask will not process more data than `maxInputSegmentBytesPerTask` even when the total number of segments is smaller than `maxNumSegments`.|1000|no|
|`type`|Set the value to `segments`.|none|yes|
|`maxInputSegmentBytesPerTask`|Maximum number of bytes of input segments to process in a single subtask. If a single segment is larger than this number, Druid processes the segment alone in a single subtask. Druid never splits input segments across tasks. A single subtask will not process more segments than `maxNumSegments` even when their total size is smaller than `maxInputSegmentBytesPerTask`. [Human-readable format](../configuration/human-readable-byte.md) is supported.|1GiB|no|
|`maxNumSegments`|Maximum number of input segments to process in a single subtask. This limit avoids failures due to the the ingestion spec being too long. There are two known limits on the max size of serialized ingestion spec: the max ZNode size in ZooKeeper (`jute.maxbuffer`) and the max packet size in MySQL (`max_allowed_packet`). These limits can make ingestion tasks fail when the serialized ingestion spec size hits one of them. A single subtask will not process more data than `maxInputSegmentBytesPerTask` even when the total number of segments is smaller than `maxNumSegments`.|1000|no|
### `partitionsSpec`
The primary partition for Druid is time. You can define a secondary partitioning method in the partitions spec. Use the `partitionsSpec` type that applies for your [rollup](rollup.md) method. For perfect rollup, you can use:
The primary partition for Druid is time. You can define a secondary partitioning method in the partitions spec. Use the `partitionsSpec` type that applies for your [rollup](rollup.md) method.
For perfect rollup, you can use:
- `hashed` partitioning based on the hash value of specified dimensions for each row
- `single_dim` based on ranges of values for a single dimension
- `range` based on ranges of values of multiple dimensions.
@ -294,14 +325,14 @@ The `partitionsSpec` types have different characteristics.
#### Dynamic partitioning
|property|description|default|required?|
|Property|Description|Default|Required|
|--------|-----------|-------|---------|
|type|Set the value to `dynamic`.|none|yes|
|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no|
|maxTotalRows|Total number of rows across all segments waiting for being pushed. Used in determining when intermediate segment push should occur.|20000000|no|
|`type`|Set the value to `dynamic`.|none|yes|
|`maxRowsPerSegment`|Used in sharding. Determines how many rows are in each segment.|5000000|no|
|`maxTotalRows`|Total number of rows across all segments waiting for being pushed. Used in determining when intermediate segment push should occur.|20000000|no|
With the dynamic partitioning, the parallel index task runs in a single phase spawning multiple worker tasks (type `single_phase_sub_task`), each of which creates segments.
With the Dynamic partitioning, the parallel index task runs in a single phase:
it spawns multiple worker tasks (type `single_phase_sub_task`), each of which creates segments.
How the worker task creates segments:
- Whenever the number of rows in the current segment exceeds
@ -310,29 +341,32 @@ How the worker task creates segments:
#### Hash-based partitioning
|property|description|default|required?|
|Property|Description|Default|Required|
|--------|-----------|-------|---------|
|type|Set the value to `hashed`.|none|yes|
|numShards|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. This property and `targetRowsPerSegment` cannot both be set.|none|no|
|targetRowsPerSegment|A target row count for each partition. If `numShards` is left unspecified, the Parallel task will determine a partition count automatically such that each partition has a row count close to the target, assuming evenly distributed keys in the input data. A target per-segment row count of 5 million is used if both `numShards` and `targetRowsPerSegment` are null. |null (or 5,000,000 if both `numShards` and `targetRowsPerSegment` are null)|no|
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no|
|partitionFunction|A function to compute hash of partition dimensions. See [Hash partition function](#hash-partition-function)|`murmur3_32_abs`|no|
|`type`|Set the value to `hashed`.|none|yes|
|`numShards`|Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. This property and `targetRowsPerSegment` cannot both be set.|none|no|
|`targetRowsPerSegment`|A target row count for each partition. If `numShards` is left unspecified, the Parallel task will determine a partition count automatically such that each partition has a row count close to the target, assuming evenly distributed keys in the input data. A target per-segment row count of 5 million is used if both `numShards` and `targetRowsPerSegment` are null. |null (or 5,000,000 if both `numShards` and `targetRowsPerSegment` are null)|no|
|`partitionDimensions`|The dimensions to partition on. Leave blank to select all dimensions.|null|no|
|`partitionFunction`|A function to compute hash of partition dimensions. See [Hash partition function](#hash-partition-function)|`murmur3_32_abs`|no|
The Parallel task with hash-based partitioning is similar to [MapReduce](https://en.wikipedia.org/wiki/MapReduce).
The task runs in up to 3 phases: `partial dimension cardinality`, `partial segment generation` and `partial segment merge`.
- The `partial dimension cardinality` phase is an optional phase that only runs if `numShards` is not specified.
The task runs in up to three phases: `partial dimension cardinality`, `partial segment generation` and `partial segment merge`.
The `partial dimension cardinality` phase is an optional phase that only runs if `numShards` is not specified.
The Parallel task splits the input data and assigns them to worker tasks based on the split hint spec.
Each worker task (type `partial_dimension_cardinality`) gathers estimates of partitioning dimensions cardinality for
each time chunk. The Parallel task will aggregate these estimates from the worker tasks and determine the highest
cardinality across all of the time chunks in the input data, dividing this cardinality by `targetRowsPerSegment` to
automatically determine `numShards`.
- In the `partial segment generation` phase, just like the Map phase in MapReduce,
In the `partial segment generation` phase, just like the Map phase in MapReduce,
the Parallel task splits the input data based on the split hint spec
and assigns each split to a worker task. Each worker task (type `partial_index_generate`) reads the assigned split, and partitions rows by the time chunk from `segmentGranularity` (primary partition key) in the `granularitySpec`
and then by the hash value of `partitionDimensions` (secondary partition key) in the `partitionsSpec`.
The partitioned data is stored in local storage of
the [middleManager](../design/middlemanager.md) or the [indexer](../design/indexer.md).
- The `partial segment merge` phase is similar to the Reduce phase in MapReduce.
The `partial segment merge` phase is similar to the Reduce phase in MapReduce.
The Parallel task spawns a new set of worker tasks (type `partial_index_generic_merge`) to merge the partitioned data created in the previous phase. Here, the partitioned data is shuffled based on
the time chunk and the hash value of `partitionDimensions` to be merged; each worker task reads the data falling in the same time chunk and the same hash value from multiple MiddleManager/Indexer processes and merges them to create the final segments. Finally, they push the final segments to the deep storage at once.
@ -360,33 +394,36 @@ When you use this technique to partition your data, segment sizes may be unequal
Range partitioning is not possible on multi-value dimensions. If the provided
`partitionDimension` is multi-value, your ingestion job will report an error.
|property|description|default|required?|
|Property|Description|Default|Required|
|--------|-----------|-------|---------|
|type|Set the value to `single_dim`.|none|yes|
|partitionDimension|The dimension to partition on. Only rows with a single dimension value are allowed.|none|yes|
|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|none|either this or `maxRowsPerSegment`|
|maxRowsPerSegment|Soft max for the number of rows to include in a partition.|none|either this or `targetRowsPerSegment`|
|assumeGrouped|Assume that input data has already been grouped on time and dimensions. Ingestion will run faster, but may choose sub-optimal partitions if this assumption is violated.|false|no|
|`type`|Set the value to `single_dim`.|none|yes|
|`partitionDimension`|The dimension to partition on. Only rows with a single dimension value are allowed.|none|yes|
|`targetRowsPerSegment`|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|none|either this or `maxRowsPerSegment`|
|`maxRowsPerSegment`|Soft max for the number of rows to include in a partition.|none|either this or `targetRowsPerSegment`|
|`assumeGrouped`|Assume that input data has already been grouped on time and dimensions. Ingestion will run faster, but may choose sub-optimal partitions if this assumption is violated.|false|no|
With `single-dim` partitioning, the Parallel task runs in 3 phases,
i.e., `partial dimension distribution`, `partial segment generation`, and `partial segment merge`.
The first phase is to collect some statistics to find
the best partitioning and the other 2 phases are to create partial segments
and to merge them, respectively, as in hash-based partitioning.
- In the `partial dimension distribution` phase, the Parallel task splits the input data and
In the `partial dimension distribution` phase, the Parallel task splits the input data and
assigns them to worker tasks based on the split hint spec. Each worker task (type `partial_dimension_distribution`) reads
the assigned split and builds a histogram for `partitionDimension`.
The Parallel task collects those histograms from worker tasks and finds
the best range partitioning based on `partitionDimension` to evenly
distribute rows across partitions. Note that either `targetRowsPerSegment`
or `maxRowsPerSegment` will be used to find the best partitioning.
- In the `partial segment generation` phase, the Parallel task spawns new worker tasks (type `partial_range_index_generate`)
In the `partial segment generation` phase, the Parallel task spawns new worker tasks (type `partial_range_index_generate`)
to create partitioned data. Each worker task reads a split created as in the previous phase,
partitions rows by the time chunk from the `segmentGranularity` (primary partition key) in the `granularitySpec`
and then by the range partitioning found in the previous phase.
The partitioned data is stored in local storage of
the [middleManager](../design/middlemanager.md) or the [indexer](../design/indexer.md).
- In the `partial segment merge` phase, the parallel index task spawns a new set of worker tasks (type `partial_index_generic_merge`) to merge the partitioned
In the `partial segment merge` phase, the parallel index task spawns a new set of worker tasks (type `partial_index_generic_merge`) to merge the partitioned
data created in the previous phase. Here, the partitioned data is shuffled based on
the time chunk and the value of `partitionDimension`; each worker task reads the segments
falling in the same partition of the same range from multiple MiddleManager/Indexer processes and merges
@ -411,13 +448,13 @@ Druid to distribute segment sizes more evenly, and to prune on more dimensions.
Range partitioning is not possible on multi-value dimensions. If one of the provided
`partitionDimensions` is multi-value, your ingestion job will report an error.
|property|description|default|required?|
|Property|Description|Default|Required|
|--------|-----------|-------|---------|
|type|Set the value to `range`.|none|yes|
|partitionDimensions|An array of dimensions to partition on. Order the dimensions from most frequently queried to least frequently queried. For best results, limit your number of dimensions to between three and five dimensions.|none|yes|
|targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|none|either this or `maxRowsPerSegment`|
|`type`|Set the value to `range`.|none|yes|
|`partitionDimensions`|An array of dimensions to partition on. Order the dimensions from most frequently queried to least frequently queried. For best results, limit your number of dimensions to between three and five dimensions.|none|yes|
|`targetRowsPerSegment`|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|none|either this or `maxRowsPerSegment`|
|maxRowsPerSegment|Soft max for the number of rows to include in a partition.|none|either this or `targetRowsPerSegment`|
|assumeGrouped|Assume that input data has already been grouped on time and dimensions. Ingestion will run faster, but may choose sub-optimal partitions if this assumption is violated.|false|no|
|`assumeGrouped`|Assume that input data has already been grouped on time and dimensions. Ingestion will run faster, but may choose sub-optimal partitions if this assumption is violated.|false|no|
#### Benefits of range partitioning
@ -454,18 +491,15 @@ support pruning.
## HTTP status endpoints
The supervisor task provides some HTTP endpoints to get running status.
The Supervisor task provides some HTTP endpoints to get running status.
* `http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/mode`
Returns 'parallel' if the indexing task is running in parallel. Otherwise, it returns 'sequential'.
* `http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/phase`
`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/mode`
Returns `parallel` if the indexing task is running in parallel. Otherwise, it returns `sequential`.
`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/phase`
Returns the name of the current phase if the task running in the parallel mode.
* `http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/progress`
`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/progress`
Returns the estimated progress of the current phase if the supervisor task is running in the parallel mode.
An example of the result is
@ -481,32 +515,27 @@ An example of the result is
}
```
* `http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtasks/running`
`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtasks/running`
Returns the task IDs of running worker tasks, or an empty list if the supervisor task is running in the sequential mode.
* `http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspecs`
`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspecs`
Returns all worker task specs, or an empty list if the supervisor task is running in the sequential mode.
* `http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspecs/running`
`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspecs/running`
Returns running worker task specs, or an empty list if the supervisor task is running in the sequential mode.
* `http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspecs/complete`
`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspecs/complete`
Returns complete worker task specs, or an empty list if the supervisor task is running in the sequential mode.
* `http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspec/{SUB_TASK_SPEC_ID}`
`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspec/{SUB_TASK_SPEC_ID}`
Returns the worker task spec of the given id, or HTTP 404 Not Found error if the supervisor task is running in the sequential mode.
* `http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspec/{SUB_TASK_SPEC_ID}/state`
`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspec/{SUB_TASK_SPEC_ID}/state`
Returns the state of the worker task spec of the given id, or HTTP 404 Not Found error if the supervisor task is running in the sequential mode.
The returned result contains the worker task spec, a current task status if exists, and task attempt history.
An example of the result is
<details>
<summary>Click to view the response</summary>
```json
{
@ -677,9 +706,9 @@ An example of the result is
"taskHistory": []
}
```
</details>
* `http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspec/{SUB_TASK_SPEC_ID}/history`
`http://{PEON_IP}:{PEON_PORT}/druid/worker/v1/chat/{SUPERVISOR_TASK_ID}/subtaskspec/{SUB_TASK_SPEC_ID}/history`
Returns the task attempt history of the worker task spec of the given id, or HTTP 404 Not Found error if the supervisor task is running in the sequential mode.
## Segment pushing modes
@ -691,8 +720,8 @@ the parallel task index supports the following segment pushing modes based upon
## Capacity planning
The supervisor task can create up to `maxNumConcurrentSubTasks` worker tasks no matter how many task slots are currently available.
As a result, total number of tasks which can be run at the same time is `(maxNumConcurrentSubTasks + 1)` (including the supervisor task).
The Supervisor task can create up to `maxNumConcurrentSubTasks` worker tasks no matter how many task slots are currently available.
As a result, total number of tasks which can be run at the same time is `(maxNumConcurrentSubTasks + 1)` (including the Supervisor task).
Please note that this can be even larger than total number of task slots (sum of the capacity of all workers).
If `maxNumConcurrentSubTasks` is larger than `n (available task slots)`, then
`maxNumConcurrentSubTasks` tasks are created by the supervisor task, but only `n` tasks would be started.
@ -728,6 +757,6 @@ For information on how to combine input sources, see [Combining input source](./
### `segmentWriteOutMediumFactory`
|Field|Type|Description|Required|
|Property|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|See [Additional Peon Configuration: SegmentWriteOutMediumFactory](../configuration/index.md#segmentwriteoutmediumfactory) for explanation and available options.|yes|
|`type`|String|See [Additional Peon Configuration: SegmentWriteOutMediumFactory](../configuration/index.md#segmentwriteoutmediumfactory) for explanation and available options.|yes|