docs: revise concurrent append and replace (#15760)

Co-authored-by: Victoria Lim <vtlim@users.noreply.github.com>
This commit is contained in:
317brian 2024-02-01 11:03:36 -08:00 committed by GitHub
parent 65857dc0e7
commit 6d617c34d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 174 additions and 110 deletions

View File

@ -131,16 +131,6 @@ maximize performance and minimize disk usage of the `compact` tasks launched by
For more details on each of the specs in an auto-compaction configuration, see [Automatic compaction dynamic configuration](../configuration/index.md#automatic-compaction-dynamic-configuration).
### Avoid conflicts with ingestion
Compaction tasks may be interrupted when they interfere with ingestion. For example, this occurs when an ingestion task needs to write data to a segment for a time interval locked for compaction. If there are continuous failures that prevent compaction from making progress, consider one of the following strategies:
* Set `skipOffsetFromLatest` to reduce the chance of conflicts between ingestion and compaction. See more details in this section below.
* Increase the priority value of compaction tasks relative to ingestion tasks. Only recommended for advanced users. This approach can cause ingestion jobs to fail or lag. To change the priority of compaction tasks, set `taskPriority` to the desired priority value in the auto-compaction configuration. For details on the priority values of different task types, see [Lock priority](../ingestion/tasks.md#lock-priority).
The Coordinator compacts segments from newest to oldest. In the auto-compaction configuration, you can set a time period, relative to the end time of the most recent segment, for segments that should not be compacted. Assign this value to `skipOffsetFromLatest`. Note that this offset is not relative to the current time but to the latest segment time. For example, if you want to skip over segments from five days prior to the end time of the most recent segment, assign `"skipOffsetFromLatest": "P5D"`.
To set `skipOffsetFromLatest`, consider how frequently you expect the stream to receive late arriving data. If your stream only occasionally receives late arriving data, the auto-compaction system robustly compacts your data even though data is ingested outside the `skipOffsetFromLatest` window. For most realtime streaming ingestion use cases, it is reasonable to set `skipOffsetFromLatest` to a few hours or a day.
### Set frequency of compaction runs
If you want the Coordinator to check for compaction more frequently than its indexing period, create a separate group to handle compaction duties.
@ -152,6 +142,34 @@ druid.coordinator.compaction.duties=["compactSegments"]
druid.coordinator.compaction.period=PT60S
```
## Avoid conflicts with ingestion
Compaction tasks may be interrupted when they interfere with ingestion. For example, this occurs when an ingestion task needs to write data to a segment for a time interval locked for compaction. If there are continuous failures that prevent compaction from making progress, consider one of the following strategies:
* Enable [concurrent append and replace tasks](#enable-concurrent-append-and-replace) on your datasource and on the ingestion tasks.
* Set `skipOffsetFromLatest` to reduce the chance of conflicts between ingestion and compaction. See more details in [Skip compaction for latest segments](#skip-compaction-for-latest-segments).
* Increase the priority value of compaction tasks relative to ingestion tasks. Only recommended for advanced users. This approach can cause ingestion jobs to fail or lag. To change the priority of compaction tasks, set `taskPriority` to the desired priority value in the auto-compaction configuration. For details on the priority values of different task types, see [Lock priority](../ingestion/tasks.md#lock-priority).
### Enable concurrent append and replace
You can use concurrent append and replace to safely replace the existing data in an interval of a datasource while new data is being appended to that interval even during compaction.
To do this, you need to update your datasource to allow concurrent append and replace tasks:
* If you're using the API, include the following `taskContext` property in your API call: `"useConcurrentLocks": true`
* If you're using the UI, enable **Allow concurrent compactions (experimental)** in the **Compaction config** for your datasource.
You'll also need to update your ingestion jobs for the datasource to include the task context `"useConcurrentLocks": true`.
For information on how to do this, see [Concurrent append and replace](../ingestion/concurrent-append-replace.md).
### Skip compaction for latest segments
The Coordinator compacts segments from newest to oldest. In the auto-compaction configuration, you can set a time period, relative to the end time of the most recent segment, for segments that should not be compacted. Assign this value to `skipOffsetFromLatest`. Note that this offset is not relative to the current time but to the latest segment time. For example, if you want to skip over segments from five days prior to the end time of the most recent segment, assign `"skipOffsetFromLatest": "P5D"`.
To set `skipOffsetFromLatest`, consider how frequently you expect the stream to receive late arriving data. If your stream only occasionally receives late arriving data, the auto-compaction system robustly compacts your data even though data is ingested outside the `skipOffsetFromLatest` window. For most realtime streaming ingestion use cases, it is reasonable to set `skipOffsetFromLatest` to a few hours or a day.
## View automatic compaction statistics
After the Coordinator has initiated auto-compaction, you can view compaction statistics for the datasource, including the number of bytes, segments, and intervals already compacted and those awaiting compaction. The Coordinator also reports the total bytes, segments, and intervals not eligible for compaction in accordance with its [segment search policy](../design/coordinator.md#segment-search-policy-in-automatic-compaction).
@ -203,106 +221,6 @@ The following auto-compaction configuration compacts updates the `wikipedia` seg
}
```
## Concurrent append and replace
:::info
Concurrent append and replace is an [experimental feature](../development/experimental.md) and is not currently available for SQL-based ingestion.
:::
This feature allows you to safely replace the existing data in an interval of a datasource while new data is being appended to that interval. One of the most common applications of this is appending new data (using say streaming ingestion) to an interval while compaction of that interval is already in progress.
To set up concurrent append and replace, you need to ensure that your ingestion jobs have the appropriate lock types:
You can enable concurrent append and replace by ensuring the following:
- The append task (with `appendToExisting` set to `true`) has `taskLockType` set to `APPEND` in the task context.
- The replace task (with `appendToExisting` set to `false`) has `taskLockType` set to `REPLACE` in the task context.
- The segment granularity of the append task is equal to or finer than the segment granularity of the replace task.
:::info
When using concurrent append and replace, keep the following in mind:
- Concurrent append and replace fails if the task with `APPEND` lock uses a coarser segment granularity than the task with the `REPLACE` lock. For example, if the `APPEND` task uses a segment granularity of YEAR and the `REPLACE` task uses a segment granularity of MONTH, you should not use concurrent append and replace.
- Only a single task can hold a `REPLACE` lock on a given interval of a datasource.
- Multiple tasks can hold `APPEND` locks on a given interval of a datasource and append data to that interval simultaneously.
:::
### Configure concurrent append and replace
##### Update the compaction settings with the UI
In the **Compaction config** for a datasource, set **Allow concurrent compactions (experimental)** to **True**.
##### Update the compaction settings with the API
Prepare your datasource for concurrent append and replace by setting its task lock type to `REPLACE`.
Add the `taskContext` like you would any other automatic compaction setting through the API:
```shell
curl --location --request POST 'http://localhost:8081/druid/coordinator/v1/config/compaction' \
--header 'Content-Type: application/json' \
--data-raw '{
"dataSource": "YOUR_DATASOURCE",
"taskContext": {
"taskLockType": "REPLACE"
}
}'
```
#### Add a task lock type to your ingestion job
Next, you need to configure the task lock type for your ingestion job:
- For streaming jobs, the context parameter goes in your supervisor spec, and the lock type is always `APPEND`
- For legacy JSON-based batch ingestion, the context parameter goes in your ingestion spec, and the lock type can be either `APPEND` or `REPLACE`.
You can provide the context parameter through the API like any other parameter for ingestion job or through the UI.
##### Add a task lock using the Druid console
As part of the **Load data** wizard for classic batch (JSON-based ingestion) and streaming ingestion, you can configure the task lock type for the ingestion during the **Publish** step:
- If you set **Append to existing** to **True**, you can then set **Allow concurrent append tasks (experimental)** to **True**.
- If you set **Append to existing** to **False**, you can then set **Allow concurrent replace tasks (experimental)** to **True**.
##### Add the task lock type through the API
Add the following JSON snippet to your supervisor or ingestion spec if you're using the API:
```json
"context": {
"taskLockType": LOCK_TYPE
}
```
The `LOCK_TYPE` depends on what you're trying to accomplish.
Set `taskLockType` to `APPEND` if either of the following are true:
- Dynamic partitioning with append to existing is set to `true`
- The ingestion job is a streaming ingestion job
If you have multiple ingestion jobs that append all targeting the same datasource and want them to run simultaneously, you need to also include the following context parameter:
```json
"useSharedLock": "true"
```
Keep in mind that `taskLockType` takes precedence over `useSharedLock`. Do not use it with `REPLACE` task locks.
Set `taskLockType` to `REPLACE` if you're replacing data. For example, if you use any of the following partitioning types, use `REPLACE`:
- hash partitioning
- range partitioning
- dynamic partitioning with append to existing set to `false`
## Learn more
See the following topics for more information:

View File

@ -0,0 +1,142 @@
---
id: concurrent-append-replace
title: Concurrent append and replace
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
:::info
Concurrent append and replace is an [experimental feature](../development/experimental.md) available for JSON-based batch and streaming. It is not currently available for SQL-based ingestion.
:::
Concurrent append and replace safely replaces the existing data in an interval of a datasource while new data is being appended to that interval. One of the most common applications of this feature is appending new data (such as with streaming ingestion) to an interval while compaction of that interval is already in progress. Druid segments the data ingested during this time dynamically. The subsequent compaction run segments the data into the granularity you specified.
To set up concurrent append and replace, use the context flag `useConcurrentLocks`. Druid will then determine the correct lock type for you, either append or replace. Although you can set the type of lock manually, we don't recommend it.
## Update the compaction settings
If you want to append data to a datasource while compaction is running, you need to enable concurrent append and replace for the datasource by updating the compaction settings.
### Update the compaction settings with the UI
In the **Compaction config** for a datasource, enable **Allow concurrent compactions (experimental)**.
For details on accessing the compaction config in the UI, see [Enable automatic compaction with the web console](../data-management/automatic-compaction.md#web-console).
### Update the compaction settings with the API
Add the `taskContext` like you would any other automatic compaction setting through the API:
```shell
curl --location --request POST 'http://localhost:8081/druid/coordinator/v1/config/compaction' \
--header 'Content-Type: application/json' \
--data-raw '{
"dataSource": "YOUR_DATASOURCE",
"taskContext": {
"useConcurrentLocks": true
}
}'
```
## Configure a task lock type for your ingestion job
You also need to configure the ingestion job to allow concurrent tasks.
You can provide the context parameter like any other parameter for ingestion jobs through the API or the UI.
### Add a task lock using the Druid console
As part of the **Load data** wizard for classic batch (JSON-based ingestion) and streaming ingestion, enable the following config on the **Publish** step: **Allow concurrent tasks (experimental)**.
### Add the task lock through the API
Add the following JSON snippet to your supervisor or ingestion spec if you're using the API:
```json
"context": {
"useConcurrentLocks": true
}
```
## Task lock types
We recommend that you use the `useConcurrentLocks` context parameter so that Druid automatically determines the task lock types for you. If, for some reason, you need to manually set the task lock types explicitly, you can read more about them in this section.
<details><summary>Click here to read more about the lock types.</summary>
Druid uses task locks to make sure that multiple conflicting operations don't happen at once.
There are two task lock types: `APPEND` and `REPLACE`. The type of lock you use is determined by what you're trying to accomplish.
When setting task lock types manually, be aware of the following:
- The segment granularity of the append task must be equal to or finer than the segment granularity of the replace task.
- Concurrent append and replace fails if the task with `APPEND` lock uses a coarser segment granularity than the task with the `REPLACE` lock. For example, if the `APPEND` task uses a segment granularity of YEAR and the `REPLACE` task uses a segment granularity of MONTH, you should not use concurrent append and replace.
- Only a single task can hold a `REPLACE` lock on a given interval of a datasource.
- Multiple tasks can hold `APPEND` locks on a given interval of a datasource and append data to that interval simultaneously.
#### Add a task lock type to your ingestion job
You configure the task lock type for your ingestion job as follows:
- For streaming jobs, the `taskLockType` context parameter goes in your supervisor spec, and the lock type is always `APPEND`.
- For classic JSON-based batch ingestion, the `taskLockType` context parameter goes in your ingestion spec, and the lock type can be either `APPEND` or `REPLACE`.
You can provide the context parameter through the API like any other parameter for ingestion job or through the UI.
##### Add a task lock using the Druid console
As part of the **Load data** wizard for classic batch (JSON-based ingestion) and streaming ingestion, you can configure the task lock type for the ingestion during the **Publish** step:
- If you set **Append to existing** to **True**, you can then set **Allow concurrent append tasks (experimental)** to **True**.
- If you set **Append to existing** to **False**, you can then set **Allow concurrent replace tasks (experimental)** to **True**.
##### Add the task lock type through the API
Add the following JSON snippet to your supervisor or ingestion spec if you're using the API:
```json
"context": {
"taskLockType": LOCK_TYPE
}
```
The `LOCK_TYPE` depends on what you're trying to accomplish.
Set `taskLockType` to `APPEND` if either of the following are true:
- Dynamic partitioning with append to existing is set to `true`
- The ingestion job is a streaming ingestion job
If you have multiple ingestion jobs that append all targeting the same datasource and want them to run simultaneously, you need to also include the following context parameter:
```json
"useSharedLock": "true"
```
Keep in mind that `taskLockType` takes precedence over `useSharedLock`. Do not use `useSharedLock` with `REPLACE` task locks.
Set `taskLockType` to `REPLACE` if you're replacing data. For example, if you use any of the following partitioning types, use `REPLACE`:
- hash partitioning
- range partitioning
- dynamic partitioning with append to existing set to `false`
</details>

View File

@ -95,6 +95,9 @@ The `maxNumConcurrentSubTasks` in the `tuningConfig` determines the number of co
By default, JSON-based batch ingestion replaces all data in the intervals in your `granularitySpec` for any segment that it writes to. If you want to add to the segment instead, set the `appendToExisting` flag in the `ioConfig`. JSON-based batch ingestion only replaces data in segments where it actively adds data. If there are segments in the intervals for your `granularitySpec` that don't have data from a task, they remain unchanged. If any existing segments partially overlap with the intervals in the `granularitySpec`, the portion of those segments outside the interval for the new spec remain visible.
You can also perform concurrent append and replace tasks. For more information, see [Concurrent append and replace](./concurrent-append-replace.md)
#### Fully replacing existing segments using tombstones
:::info

View File

@ -96,6 +96,7 @@
},
"ingestion/ingestion-spec",
"ingestion/schema-design",
"ingestion/concurrent-append-replace",
"ingestion/faq"
],
"Data management": [