updates Kafka and Kinesis to use . Fixes some typos and other style i… (#11624)

* updates Kafka and Kinesis to use . Fixes some typos and other style issues for Kafka.

* fix spelling

* Update docs/development/extensions-core/kafka-ingestion.md

Co-authored-by: Jihoon Son <jihoonson@apache.org>

* Update docs/development/extensions-core/kafka-ingestion.md

Co-authored-by: Jihoon Son <jihoonson@apache.org>

* Update docs/development/extensions-core/kafka-ingestion.md

Co-authored-by: Jihoon Son <jihoonson@apache.org>

* Update docs/development/extensions-core/kafka-ingestion.md

Co-authored-by: Jihoon Son <jihoonson@apache.org>

* Update docs/development/extensions-core/kafka-ingestion.md

Co-authored-by: Jihoon Son <jihoonson@apache.org>

* Update docs/development/extensions-core/kinesis-ingestion.md

Co-authored-by: Jihoon Son <jihoonson@apache.org>

* Update docs/development/extensions-core/kinesis-ingestion.md

Co-authored-by: Jihoon Son <jihoonson@apache.org>

* Update docs/development/extensions-core/kafka-ingestion.md

Co-authored-by: Jihoon Son <jihoonson@apache.org>

* address comments

Co-authored-by: Jihoon Son <jihoonson@apache.org>
This commit is contained in:
Charles Smith 2021-08-26 13:22:30 -07:00 committed by GitHub
parent ac2b65e837
commit 9032a0b079
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 219 additions and 253 deletions

View File

@ -23,101 +23,99 @@ sidebar_label: "Apache Kafka"
~ under the License.
-->
When you enable the Kafka indexing service, you can configure *supervisors* on the Overlord to manage the creation and lifetime of Kafka indexing tasks. These indexing tasks read events using Kafka's own partition and offset mechanism to guarantee exactly-once ingestion. The supervisor oversees the state of the indexing tasks to:
- coordinate handoffs
- manage failures
- ensure that scalability and replication requirements are maintained.
The Kafka indexing service enables the configuration of *supervisors* on the Overlord, which facilitate ingestion from
Kafka by managing the creation and lifetime of Kafka indexing tasks. These indexing tasks read events using Kafka's own
partition and offset mechanism and are therefore able to provide guarantees of exactly-once ingestion.
The supervisor oversees the state of the indexing tasks to coordinate handoffs,
manage failures, and ensure that the scalability and replication requirements are maintained.
To use the Kafka indexing service, load the `druid-kafka-indexing-service` core Apache Druid extension. See [Including Extensions](../../development/extensions.md#loading-extensions)).
This service is provided in the `druid-kafka-indexing-service` core Apache Druid extension (see
[Including Extensions](../../development/extensions.md#loading-extensions)).
This topic covers the ingestion spec for Kafka. For a general `ingestionSpec` reference, see [Ingestion specs](../../ingestion/ingestion-spec.md). For a walk-through, check out the [Loading from Apache Kafka](../../tutorials/tutorial-kafka.md) tutorial.
> The Kafka indexing service supports transactional topics which were introduced in Kafka 0.11.x. It is the default behavior of Druid and make the
> Kafka consumer that Druid uses incompatible with older brokers. Ensure that your Kafka brokers are version 0.11.x or
> better before using this functionality. Refer [Kafka upgrade guide](https://kafka.apache.org/documentation/#upgrade)
> if you are using older version of Kafka brokers.
> In addition, users could set `isolation.level` `read_uncommitted` in `consumerProperties`, if don't need Druid to consume transactional topics or need Druid to consume older versions of Kafka.
> Make sure offsets are sequential, since there is no offset gap check in Druid anymore.
## Kafka support
The Kafka indexing service supports transactional topics introduced in Kafka 0.11.x by default. The consumer for Kafka indexing service is incompatible with older Kafka brokers. If you are using an older version, refer to the [Kafka upgrade guide](https://kafka.apache.org/documentation/#upgrade).
> If your Kafka cluster enables consumer-group based ACLs, you can set `group.id` in `consumerProperties` to override the default auto generated group id.
Additionally, you can set `isolation.level` to `read_uncommitted` in `consumerProperties` if either:
- You don't need Druid to consume transactional topics.
- You need Druid to consume older versions of Kafka. Make sure offsets are sequential, since there is no offset gap check in Druid anymore.
## Tutorial
This page contains reference documentation for Apache Kafka-based ingestion.
For a walk-through instead, check out the [Loading from Apache Kafka](../../tutorials/tutorial-kafka.md) tutorial.
If your Kafka cluster enables consumer-group based ACLs, you can set `group.id` in `consumerProperties` to override the default auto generated group id.
## Submitting a Supervisor Spec
The Kafka indexing service requires that the `druid-kafka-indexing-service` extension be loaded on both the Overlord and the
MiddleManagers. A supervisor for a dataSource is started by submitting a supervisor spec via HTTP POST to
`http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor`, for example:
To use the Kafka indexing service, load the `druid-kafka-indexing-service` extension on both the Overlord and the MiddleManagers. Druid starts a supervisor for a dataSource when you submit a supervisor spec. You can use the following endpoint:
`http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor`
For example:
```
curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json http://localhost:8090/druid/indexer/v1/supervisor
```
A sample supervisor spec is shown below:
Where the file `supervisor-spec.json` contains a Kafka supervisor spec:
```json
{
"type": "kafka",
"dataSchema": {
"dataSource": "metrics-kafka",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [],
"dimensionExclusions": [
"timestamp",
"value"
]
},
"metricsSpec": [
{
"name": "count",
"type": "count"
"spec": {
"dataSchema": {
"dataSource": "metrics-kafka",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
"dimensionsSpec": {
"dimensions": [],
"dimensionExclusions": [
"timestamp",
"value"
]
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
},
"ioConfig": {
"topic": "metrics",
"inputFormat": {
"type": "json"
},
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000
}
},
"ioConfig": {
"topic": "metrics",
"inputFormat": {
"type": "json"
},
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000
}
}
}
```
@ -125,27 +123,28 @@ A sample supervisor spec is shown below:
|Field|Description|Required|
|--------|-----------|---------|
|`type`|The supervisor type, this should always be `kafka`.|yes|
|`dataSchema`|The schema that will be used by the Kafka indexing task during ingestion. See [`dataSchema`](../../ingestion/ingestion-spec.md#dataschema) for details.|yes|
|`ioConfig`|A KafkaSupervisorIOConfig object for configuring Kafka connection and I/O-related settings for the supervisor and indexing task. See [KafkaSupervisorIOConfig](#kafkasupervisorioconfig) below.|yes|
|`tuningConfig`|A KafkaSupervisorTuningConfig object for configuring performance-related settings for the supervisor and indexing tasks. See [KafkaSupervisorTuningConfig](#kafkasupervisortuningconfig) below.|no|
|`type`|Supervisor type. For Kafka streaming, set to `kafka`.|yes|
|`spec`| Container object for the supervisor configuration. | yes |
|`dataSchema`|Schema for the Kafka indexing task to use during ingestion.|yes|
|`ioConfig`|A `KafkaSupervisorIOConfig` object to define the Kafka connection and I/O-related settings for the supervisor and indexing task. See [KafkaSupervisorIOConfig](#kafkasupervisorioconfig).|yes|
|`tuningConfig`|A KafkaSupervisorTuningConfig object to define performance-related settings for the supervisor and indexing tasks. See [KafkaSupervisorTuningConfig](#kafkasupervisortuningconfig).|no|
### KafkaSupervisorIOConfig
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|`topic`|String|The Kafka topic to read from. This must be a specific topic as topic patterns are not supported.|yes|
|`inputFormat`|Object|[`inputFormat`](../../ingestion/data-formats.md#input-format) to specify how to parse input data. See [the below section](#specifying-data-format) for details about specifying the input format.|yes|
|`consumerProperties`|Map<String, Object>|A map of properties to be passed to the Kafka consumer. See [next section](#more-on-consumerproperties) for more information.|yes|
|`topic`|String|The Kafka topic to read from. Must be a specific topic. Topic patterns are not supported.|yes|
|`inputFormat`|Object|`inputFormat` to define input data parsing. See [Specifying data format](#specifying-data-format) for details about specifying the input format.|yes|
|`consumerProperties`|Map<String, Object>|A map of properties to pass to the Kafka consumer. See [More on consumer properties](#more-on-consumerproperties).|yes|
|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)|
|`replicas`|Integer|The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against process failure.|no (default == 1)|
|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. This means that the maximum number of reading tasks will be `taskCount * replicas` and the total number of tasks (*reading* + *publishing*) will be higher than this. See [Capacity Planning](#capacity-planning) below for more details. The number of reading tasks will be less than `taskCount` if `taskCount > {numKafkaPartitions}`.|no (default == 1)|
|`taskDuration`|ISO8601 Period|The length of time before tasks stop reading and begin publishing their segment.|no (default == PT1H)|
|`replicas`|Integer|The number of replica sets. "1" means a single set of tasks without replication. Druid always assigns replica tasks to different workers to provide resiliency against worker failure.|no (default == 1)|
|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. The maximum number of reading tasks equals `taskCount * replicas`. Therefore, the total number of tasks, *reading* + *publishing*, is greater than this count. See [Capacity Planning](#capacity-planning) for more details. When `taskCount > {numKafkaPartitions}`, the actual number of reading tasks is less than the `taskCount` value.|no (default == 1)|
|`taskDuration`|ISO8601 Period|The length of time before tasks stop reading and begin publishing segments.|no (default == PT1H)|
|`startDelay`|ISO8601 Period|The period to wait before the supervisor starts managing tasks.|no (default == PT5S)|
|`period`|ISO8601 Period|How often the supervisor will execute its management logic. Note that the supervisor will also run in response to certain events (such as tasks succeeding, failing, and reaching their taskDuration) so this value specifies the maximum time between iterations.|no (default == PT30S)|
|`useEarliestOffset`|Boolean|If a supervisor is managing a dataSource for the first time, it will obtain a set of starting offsets from Kafka. This flag determines whether it retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks will start from where the previous segments ended so this flag will only be used on first run.|no (default == false)|
|`completionTimeout`|ISO8601 Period|The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|no (default == PT30M)|
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
|`period`|ISO8601 Period|Frequency at which the supervisor executes its management logic. The supervisor also runs in response to certain events. For example task success, task failure, and tasks reaching their `taskDuration`. The `period` value specifies the maximum time between iterations.|no (default == PT30S)|
|`useEarliestOffset`|Boolean|If a supervisor manages a dataSource for the first time, it obtains a set of starting offsets from Kafka. This flag determines whether it retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks will start from where the previous segments ended. Therefore Druid only uses `useEarliestOffset` on first run.|no (default == false)|
|`completionTimeout`|ISO8601 Period|The length of time to wait before declaring a publishing task as failed and terminating it. If the value is too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|no (default == PT30M)|
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than *2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the number of Kafka ingest tasks. ONLY supported for Kafka indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler Properties) for details.|no (default == null)|
@ -156,18 +155,18 @@ A sample supervisor spec is shown below:
| Property | Description | Required |
| ------------- | ------------- | ------------- |
| `enableTaskAutoScaler` | Whether enable this feature or not. Set false or ignored here will disable `autoScaler` even though `autoScalerConfig` is not null| no (default == false) |
| `taskCountMax` | Maximum value of task count. Make Sure `taskCountMax >= taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, the maximum number of reading tasks would be equal to `{numKafkaPartitions}` and `taskCountMax` would be ignored. | yes |
| `taskCountMin` | Minimum value of task count. When enable autoscaler, the value of taskCount in `IOConfig` will be ignored, and `taskCountMin` will be the number of tasks that ingestion starts going up to `taskCountMax`| yes |
| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two scale actions | no (default == 600000) |
| `autoScalerStrategy` | The algorithm of `autoScaler`. ONLY `lagBased` is supported for now. See [Lag Based AutoScaler Strategy Related Properties](#Lag Based AutoScaler Strategy Related Properties) for details.| no (default == `lagBased`) |
| `enableTaskAutoScaler` | Enable or disable autoscaling. `false` or blank disables the `autoScaler` even when `autoScalerConfig` is not null| no (default == false) |
| `taskCountMax` | Maximum number of ingestion tasks. Set `taskCountMax >= taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, Druid only scales reading tasks up to the `{numKafkaPartitions}`. In this case `taskCountMax` is ignored. | yes |
| `taskCountMin` | Minimum number of ingestion tasks. When you enable autoscaler, Druid ignores the value of taskCount in `IOConfig` and starts with the `taskCountMin` number of tasks.| yes |
| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two scale actions. | no (default == 600000) |
| `autoScalerStrategy` | The algorithm of `autoScaler`. Only supports `lagBased`. See [Lag Based AutoScaler Strategy Related Properties](#Lag Based AutoScaler Strategy Related Properties) for details.| no (default == `lagBased`) |
### Lag Based AutoScaler Strategy Related Properties
| Property | Description | Required |
| ------------- | ------------- | ------------- |
| `lagCollectionIntervalMillis` | Period of lag points collection. | no (default == 30000) |
| `lagCollectionRangeMillis` | The total time window of lag collection, Use with `lagCollectionIntervalMillis`it means that in the recent `lagCollectionRangeMillis`, collect lag metric points every `lagCollectionIntervalMillis`. | no (default == 600000) |
| `scaleOutThreshold` | The Threshold of scale out action | no (default == 6000000) |
| `lagCollectionRangeMillis` | The total time window of lag collection. Use with `lagCollectionIntervalMillis`it means that in the recent `lagCollectionRangeMillis`, collect lag metric points every `lagCollectionIntervalMillis`. | no (default == 600000) |
| `scaleOutThreshold` | The threshold of scale out action | no (default == 6000000) |
| `triggerScaleOutFractionThreshold` | If `triggerScaleOutFractionThreshold` percent of lag points are higher than `scaleOutThreshold`, then do scale out action. | no (default == 0.3) |
| `scaleInThreshold` | The Threshold of scale in action | no (default == 1000000) |
| `triggerScaleInFractionThreshold` | If `triggerScaleInFractionThreshold` percent of lag points are lower than `scaleOutThreshold`, then do scale in action. | no (default == 0.9) |
@ -176,84 +175,46 @@ A sample supervisor spec is shown below:
| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
| `scaleOutStep` | How many tasks to add at a time | no (default == 2) |
A sample supervisor spec with `lagBased` autoScaler enabled is shown below:
The following example demonstrates supervisor spec with `lagBased` autoScaler enabled:
```json
{
"type": "kafka",
"dataSchema": {
"dataSource": "metrics-kafka",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
],
"dimensionExclusions": [
"timestamp",
"value"
]
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"ioConfig": {
"topic": "metrics",
"inputFormat": {
"type": "json"
},
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"autoScalerConfig": {
"enableTaskAutoScaler": true,
"taskCountMax": 6,
"taskCountMin": 2,
"minTriggerScaleActionFrequencyMillis": 600000,
"autoScalerStrategy": "lagBased",
"lagCollectionIntervalMillis": 30000,
"lagCollectionRangeMillis": 600000,
"scaleOutThreshold": 6000000,
"triggerScaleOutFractionThreshold": 0.3,
"scaleInThreshold": 1000000,
"triggerScaleInFractionThreshold": 0.9,
"scaleActionStartDelayMillis": 300000,
"scaleActionPeriodMillis": 60000,
"scaleInStep": 1,
"scaleOutStep": 2
},
"taskCount":1,
"replicas":1,
"taskDuration":"PT1H"
},
"tuningConfig":{
"type":"kafka",
"maxRowsPerSegment":5000000
"spec": {
"dataSchema": {
...
},
"ioConfig": {
"topic": "metrics",
"inputFormat": {
"type": "json"
},
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"autoScalerConfig": {
"enableTaskAutoScaler": true,
"taskCountMax": 6,
"taskCountMin": 2,
"minTriggerScaleActionFrequencyMillis": 600000,
"autoScalerStrategy": "lagBased",
"lagCollectionIntervalMillis": 30000,
"lagCollectionRangeMillis": 600000,
"scaleOutThreshold": 6000000,
"triggerScaleOutFractionThreshold": 0.3,
"scaleInThreshold": 1000000,
"triggerScaleInFractionThreshold": 0.9,
"scaleActionStartDelayMillis": 300000,
"scaleActionPeriodMillis": 60000,
"scaleInStep": 1,
"scaleOutStep": 2
},
"taskCount":1,
"replicas":1,
"taskDuration":"PT1H"
},
"tuningConfig":{
...
}
}
}
```
@ -273,13 +234,16 @@ Note: SSL connections may also be supplied using the deprecated [Password Provid
#### Specifying data format
Kafka indexing service supports both [`inputFormat`](../../ingestion/data-formats.md#input-format) and [`parser`](../../ingestion/data-formats.md#parser) to specify the data format.
The `inputFormat` is a new and recommended way to specify the data format for Kafka indexing service,
but unfortunately, it doesn't support all data formats supported by the legacy `parser`.
(They will be supported in the future.)
Use the `inputFormat` to specify the data format for Kafka indexing service unless you need a format only supported by the legacy `parser`.
The supported `inputFormat`s include [`csv`](../../ingestion/data-formats.md#csv),
[`delimited`](../../ingestion/data-formats.md#tsv-delimited), [`json`](../../ingestion/data-formats.md#json), [`avro_stream`](../../ingestion/data-formats.md#avro-stream), [`protobuf`](../../ingestion/data-formats.md#protobuf).
You can also read [`thrift`](../extensions-contrib/thrift.md) formats using `parser`.
Supported `inputFormat`s include:
- `csv`
- `delimited`
- `json`
- `avro_stream`
- `protobuf`
For more information, see [Data formats](../../ingestion/data-formats.md). You can also read [`thrift`](../extensions-contrib/thrift.md) formats using `parser`.
<a name="tuningconfig"></a>
@ -396,7 +360,7 @@ supervisor executes this run loop after startup or after resuming from a suspens
initialization-type issues, where the supervisor is unable to reach a stable state (perhaps because it can't connect to
Kafka, it can't read from the Kafka topic, or it can't communicate with existing tasks). Once the supervisor is stable -
that is, once it has completed a full execution without encountering any issues - `detailedState` will show a `RUNNING`
state until it is stopped, suspended, or hits a failure threshold and transitions to an unhealthy state.
state until it is stopped, suspended, or hits a task failure threshold and transitions to an unhealthy state.
### Getting Supervisor Ingestion Stats Report
@ -519,22 +483,18 @@ and begin publishing their segments. A new supervisor will then be started which
will start reading from the offsets where the previous now-publishing tasks left off, but using the updated schema.
In this way, configuration changes can be applied without requiring any pause in ingestion.
### Deployment Notes
### Deployment Notes on Kafka partitions and Druid segments
#### On the Subject of Segments
Druid assigns each Kafka indexing task Kafka partitions. A task writes the events it consumes from Kafka into a single segment for the segment granularity interval until it reaches one of the following: `maxRowsPerSegment`, `maxTotalRows` or `intermediateHandoffPeriod` limit. At this point, the task creates a new partition for this segment granularity to contain subsequent events.
Each Kafka Indexing Task puts events consumed from Kafka partitions assigned to it in a single segment for each segment
granular interval until maxRowsPerSegment, maxTotalRows or intermediateHandoffPeriod limit is reached, at this point a new partition
for this segment granularity is created for further events. Kafka Indexing Task also does incremental hand-offs which
means that all the segments created by a task will not be held up till the task duration is over. As soon as maxRowsPerSegment,
maxTotalRows or intermediateHandoffPeriod limit is hit, all the segments held by the task at that point in time will be handed-off
and new set of segments will be created for further events. This means that the task can run for longer durations of time
without accumulating old segments locally on Middle Manager processes and it is encouraged to do so.
The Kafka Indexing Task also does incremental hand-offs. Therefore segments become available as they are ready and you do not have to wait for all segments until the end of the task duration. When the task reaches one of `maxRowsPerSegment`, `maxTotalRows`, or `intermediateHandoffPeriod`, it hands off all the segments and creates a new new set of segments will be created for further events. This allows the task to run for longer durations without accumulating old segments locally on Middle Manager processes.
Kafka Indexing Service may still produce some small segments. Lets say the task duration is 4 hours, segment granularity
is set to an HOUR and Supervisor was started at 9:10 then after 4 hours at 13:10, new set of tasks will be started and
events for the interval 13:00 - 14:00 may be split across previous and new set of tasks. If you see it becoming a problem then
one can schedule re-indexing tasks be run to merge segments together into new segments of an ideal size (in the range of ~500-700 MB per segment).
Details on how to optimize the segment size can be found on [Segment size optimization](../../operations/segment-optimization.md).
The Kafka Indexing Service may still produce some small segments. For example, consider the following scenario:
- Task duration is 4 hours
- Segment granularity is set to an HOUR
- The supervisor was started at 9:10
After 4 hours at 13:10, Druid starts a new set of tasks. The events for the interval 13:00 - 14:00 may be split across existing tasks and the new set of tasks which could result in small segments. To merge them together into new segments of an ideal size (in the range of ~500-700 MB per segment), you can schedule re-indexing tasks, optionally with a different segment granularity.
For more detail, see [Segment size optimization](../../operations/segment-optimization.md).
There is also ongoing work to support automatic segment compaction of sharded segments as well as compaction not requiring
Hadoop (see [here](https://github.com/apache/druid/pull/5102)).

View File

@ -23,87 +23,92 @@ sidebar_label: "Amazon Kinesis"
~ under the License.
-->
When you enable the Kinesis indexing service, you can configure *supervisors* on the Overlord to manage the creation and lifetime of Kinesis indexing tasks. These indexing tasks read events using Kinesis' own shard and sequence number mechanism to guarantee exactly-once ingestion. The supervisor oversees the state of the indexing tasks to:
- coordinate handoffs
- manage failures
- ensure that scalability and replication requirements are maintained.
Similar to the [Kafka indexing service](./kafka-ingestion.md), the Kinesis indexing service for Apache Druid enables the configuration of *supervisors* on the Overlord. These supervisors facilitate ingestion from Kinesis by managing the creation and lifetime of Kinesis indexing tasks. These indexing tasks read events using Kinesis's own
Shards and Sequence Number mechanism and are therefore able to provide guarantees of exactly-once ingestion.
The supervisor oversees the state of the indexing tasks to coordinate handoffs, manage failures,
and ensure that the scalability and replication requirements are maintained.
The Kinesis indexing service is provided as the `druid-kinesis-indexing-service` core Apache Druid extension (see
To use the Kinesis indexing service, load the `druid-kinesis-indexing-service` core Apache Druid extension (see
[Including Extensions](../../development/extensions.md#loading-extensions)).
> Before you deploy the Kinesis extension to production, read the [Kinesis known issues](#kinesis-known-issues).
## Submitting a Supervisor Spec
The Kinesis indexing service requires that the `druid-kinesis-indexing-service` extension be loaded on both the Overlord
and the MiddleManagers. A supervisor for a dataSource is started by submitting a supervisor spec via HTTP POST to
`http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor`, for example:
To use the Kinesis indexing service, load the `druid-kinesis-indexing-service` extension on both the Overlord and the MiddleManagers. Druid starts a supervisor for a dataSource when you submit a supervisor spec. Submit your supervisor spec to the following endpoint:
`http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor`
For example:
```
curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json http://localhost:8090/druid/indexer/v1/supervisor
```
A sample supervisor spec is shown below:
Where the file `supervisor-spec.json` contains a Kinesis supervisor spec:
```json
{
"type": "kinesis",
"dataSchema": {
"dataSource": "metrics-kinesis",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [],
"dimensionExclusions": [
"timestamp",
"value"
]
},
"metricsSpec": [
{
"name": "count",
"type": "count"
"spec": {
"dataSchema": {
"dataSource": "metrics-kinesis",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
"dimensionsSpec": {
"dimensions": [],
"dimensionExclusions": [
"timestamp",
"value"
]
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"ioConfig": {
"stream": "metrics",
"inputFormat": {
"type": "json"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"ioConfig": {
"stream": "metrics",
"inputFormat": {
"type": "json"
},
"endpoint": "kinesis.us-east-1.amazonaws.com",
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H",
"recordsPerFetch": 2000,
"fetchDelayMillis": 1000
},
"tuningConfig": {
"type": "kinesis",
"maxRowsPerSegment": 5000000
"endpoint": "kinesis.us-east-1.amazonaws.com",
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H",
"recordsPerFetch": 2000,
"fetchDelayMillis": 1000
},
"tuningConfig": {
"type": "kinesis",
"maxRowsPerSegment": 5000000
}
}
}
```
@ -114,6 +119,7 @@ A sample supervisor spec is shown below:
|Field|Description|Required|
|--------|-----------|---------|
|`type`|The supervisor type, this should always be `kinesis`.|yes|
|`spec`|Container object for the supervisor configuration.|yes|
|`dataSchema`|The schema that will be used by the Kinesis indexing task during ingestion. See [`dataSchema`](../../ingestion/ingestion-spec.md#dataschema).|yes|
|`ioConfig`|A KinesisSupervisorIOConfig object for configuring Kafka connection and I/O-related settings for the supervisor and indexing task. See [KinesisSupervisorIOConfig](#kinesissupervisorioconfig) below.|yes|
|`tuningConfig`|A KinesisSupervisorTuningConfig object for configuring performance-related settings for the supervisor and indexing tasks. See [KinesisSupervisorTuningConfig](#kinesissupervisortuningconfig) below.|no|