druid-docs-cn/DataIngestion/kafka.md

185 lines
16 KiB
Markdown
Raw Normal View History

2020-04-19 23:15:56 -04:00
<!-- toc -->
## Apache Kafka 摄取数据
2020-04-20 05:09:08 -04:00
Kafka索引服务支持在Overlord上配置*supervisors*supervisors通过管理Kafka索引任务的创建和生存期来便于从Kafka摄取数据。这些索引任务使用Kafka自己的分区和偏移机制读取事件因此能够保证只接收一次**exactly-once**。supervisor监视索引任务的状态以便于协调切换、管理故障并确保维护可伸缩性和复制要求。
这个服务由 `druid-kafka-indexing-service` 这个druid核心扩展详情请见 [扩展列表](../Development/extensions.md所提供。
> [!WARNING]
> Kafka索引服务支持在Kafka 0.11.x中引入的事务主题。这些更改使Druid使用的Kafka消费者与旧的brokers不兼容。在使用此功能之前请确保您的Kafka broker版本为0.11.x或更高版本。如果您使用的是旧版本的Kafka brokers请参阅《[Kafka升级指南](https://kafka.apache.org/documentation/#upgrade)》。
2020-04-19 23:15:56 -04:00
### 教程
2020-04-20 05:09:08 -04:00
本页包含基于Apache Kafka的摄取的参考文档。同样您可以查看 [Apache Kafka教程](../Tutorials/chapter-2.md) 中的加载。
2020-04-19 23:15:56 -04:00
### 提交一个supervisor规范
2020-04-20 05:09:08 -04:00
Kafka索引服务需要同时在Overlord和MiddleManagers中加载 `druid-kafka-indexing-service` 扩展。 用于一个数据源的supervisor通过向 `http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor` 发送一个HTTP POST请求来启动例如
```
curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json http://localhost:8090/druid/indexer/v1/supervisor
```
一个示例supervisor规范如下
```
{
"type": "kafka",
"dataSchema": {
"dataSource": "metrics-kafka",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [],
"dimensionExclusions": [
"timestamp",
"value"
]
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000
},
"ioConfig": {
"topic": "metrics",
"inputFormat": {
"type": "json"
},
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
}
}
```
2020-04-19 23:15:56 -04:00
### supervisor配置
2020-04-20 05:09:08 -04:00
| 字段 | 描述 | 是否必须 |
|-|-|-|-|
| `type` | supervisor类型 总是 `kafka` | 是 |
| `dataSchema` | Kafka索引服务在摄取时使用的schema。详情见 [dataSchema](ingestion.md#dataschema) | 是 |
| `ioConfig` | 用于配置supervisor和索引任务的KafkaSupervisorIOConfig详情见以下 | 是 |
| `tuningConfig` | 用于配置supervisor和索引任务的KafkaSupervisorTuningConfig详情见以下 | 是 |
2020-04-19 23:15:56 -04:00
#### KafkaSupervisorTuningConfig
2020-04-20 05:09:08 -04:00
`tuningConfig` 是可选的, 如果未被配置的话,则使用默认的参数。
2020-04-21 21:15:53 -04:00
| 字段 | 类型 | 描述 | 是否必须 |
|-|-|-|-|
| `type` | String | 索引任务类型, 总是 `kafka` | 是 |
| `maxRowsInMemory` | Integer | 在持久化之前在内存中聚合的最大行数。该数值为聚合之后的行数,所以它不等于原始输入事件的行数,而是事件被聚合后的行数。 通常用来管理所需的JVM堆内存。 使用 `maxRowsInMemory * (2 + maxPendingPersists) ` 来当做索引任务的最大堆内存。通常用户不需要设置这个值,但是也需要根据数据的特点来决定,如果行的字节数较短,用户可能不想在内存中存储一百万行,应该设置这个值 | 否(默认为 1000000|
2020-04-23 03:31:44 -04:00
| `maxBytesInMemory` | Long | 在持久化之前在内存中聚合的最大字节数。这是基于对内存使用量的粗略估计,而不是实际使用量。通常这是在内部计算的,用户不需要设置它。 索引任务的最大内存使用量是 `maxRowsInMemory * (2 + maxPendingPersists) ` | 否默认为最大JVM内存的 1/6 |
| `maxRowsPerSegment` | Integer | 聚合到一个段中的行数,该数值为聚合后的数值。 当 `maxRowsPerSegment` 或者 `maxTotalRows` 有一个值命中的时候则触发handoff数据落盘后传到深度存储 该动作也会按照每 `intermediateHandoffPeriod` 时间间隔发生一次。 | 否默认为5000000 |
| `maxTotalRows` | Long | 所有段的聚合后的行数,该值为聚合后的行数。当 `maxRowsPerSegment` 或者 `maxTotalRows` 有一个值命中的时候则触发handoff数据落盘后传到深度存储 该动作也会按照每 `intermediateHandoffPeriod` 时间间隔发生一次。 | 否默认为unlimited|
| `intermediateHandoffPeriod` | ISO8601 Period | 确定触发持续化存储的周期 | 否(默认为 PT10M|
| `maxPendingPersists` | Integer | 正在等待但启动的持久化过程的最大数量。 如果新的持久化任务超过了此限制,则在当前运行的持久化完成之前,摄取将被阻止。索引任务的最大内存使用量是 `maxRowsInMemory * (2 + maxPendingPersists) ` | 否默认为0意味着一个持久化可以与摄取同时运行而没有一个可以排队|
| `indexSpec` | Object | 调整数据被如何索引。详情可以见 [indexSpec](#indexspec) | 否 |
2020-04-24 02:01:06 -04:00
| `indexSpecForIntermediatePersists` | | 定义要在索引时用于中间持久化临时段的段存储格式选项。这可用于禁用中间段上的维度/度量压缩以减少最终合并所需的内存。但是在中间段上禁用压缩可能会增加页缓存的使用而在它们被合并到发布的最终段之前使用它们有关可能的值请参阅IndexSpec。 | 否(默认与 `indexSpec` 相同) |
| `reportParseExceptions` | Boolean | *已废弃*。如果为true则在解析期间遇到的异常即停止摄取如果为false则将跳过不可解析的行和字段。将 `reportParseExceptions` 设置为 `true` 将覆盖`maxParseExceptions` 和 `maxSavedParseExceptions` 的现有配置,将`maxParseExceptions` 设置为 `0` 并将 `maxSavedParseExceptions` 限制为不超过1。 | 否默认为false|
| `handoffConditionTimeout` | Long | 段切换(持久化)可以等待的毫秒数(超时时间)。 该值要被设置为大于0的数设置为0意味着将会一直等待不超时 | 否默认为0|
| `resetOffsetAutomatically` | Boolean | 控制当Druid需要读取Kafka中不可用的消息时的行为比如当发生了 `OffsetOutOfRangeException` 异常时。 <br> 如果为false则异常将抛出这将导致任务失败并停止接收。如果发生这种情况则需要手动干预来纠正这种情况可能使用 [重置 Supervisor API](../Operations/api.md#Supervisor)。此模式对于生产非常有用,因为它将使您意识到摄取的问题。 <br> 如果为trueDruid将根据 `useEarliestOffset` 属性的值(`true` 为 `earliest``false` 为 `latest`自动重置为Kafka中可用的较早或最新偏移量。请注意这可能导致数据在您不知情的情况下*被丢弃*(如果`useEarliestOffset` 为 `false`)或 *重复*(如果 `useEarliestOffset``true`。消息将被记录下来以标识已发生重置但摄取将继续。这种模式对于非生产环境非常有用因为它将使Druid尝试自动从问题中恢复即使这些问题会导致数据被安静删除或重复。 <br> 该特性与Kafka的 `auto.offset.reset` 消费者属性很相似 | 否默认为false|
| `workerThreads` | Integer | supervisor用于异步操作的线程数。| 否(默认为: min(10, taskCount) |
| `chatThreads` | Integer | 与索引任务的会话线程数 | 否默认为min(10, taskCount * replicas)|
| `chatRetries` | Integer | 在任务没有响应之前将重试对索引任务的HTTP请求的次数 | 否默认为8|
| `httpTimeout` | ISO8601 Period | 索引任务的HTTP响应超时 | 否默认为PT10S|
| `shutdownTimeout` | ISO8601 Period | supervisor尝试优雅的停掉一个任务的超时时间 | 否默认为PT80S|
| `offsetFetchPeriod` | ISO8601 Period | supervisor查询Kafka和索引任务以获取当前偏移和计算滞后的频率 | 否默认为PT30S最小为PT5S|
| `segmentWriteOutMediumFactory` | Object | 创建段时要使用的段写入介质。更多信息见下文。| 否(默认不指定,使用来源于 `druid.peon.defaultSegmentWriteOutMediumFactory.type` 的值)|
| `intermediateHandoffPeriod` | ISO8601 Period | 段发生切换的频率。当 `maxRowsPerSegment` 或者 `maxTotalRows` 有一个值命中的时候则触发handoff数据落盘后传到深度存储 该动作也会按照每 `intermediateHandoffPeriod` 时间间隔发生一次。 | 否默认为P2147483647D|
| `logParseExceptions` | Boolean | 如果为true则在发生解析异常时记录错误消息其中包含有关发生错误的行的信息。| 否默认为false|
| `maxParseExceptions` | Integer | 任务停止接收之前可发生的最大分析异常数。如果设置了 `reportParseExceptions`,则该值会被重写。| 否默认为unlimited|
| `maxSavedParseExceptions` | Integer | 当出现解析异常时Druid可以跟踪最新的解析异常。"maxSavedParseExceptions"决定将保存多少个异常实例。这些保存的异常将在 [任务完成报告](taskrefer.md#任务报告) 中的任务完成后可用。如果设置了`reportParseExceptions`,则该值会被重写。 | 否默认为0|
2020-04-24 05:59:09 -04:00
##### IndexSpec
| 字段 | 类型 | 描述 | 是否必须 |
|-|-|-|-|-|
| `bitmap` | Object | 位图索引的压缩格式。 应该是一个JSON对象详情见以下 | 否(默认为 `roaring`|
| `dimensionCompression` | String | 维度列的压缩格式。 从 `LZ4`, `LZF` 或者 `uncompressed` 选择 | 否(默认为 `LZ4`|
| `metricCompression` | String | Metrics列的压缩格式。 从 `LZ4`, `LZF`, `uncompressed` 或者 `none` 选择 | 否(默认为 `LZ4`|
| `longEncoding` | String | 类型为long的Metric列和维度列的编码格式。从 `auto` 或者 `longs` 中选择。`auto`编码是根据列基数使用偏移量或查找表对值进行编码,并以可变大小存储它们。`longs` 按原样存储值每个值8字节。 | 否(默认为 `longs`|
**Bitmap类型**
对于Roaring位图
| 字段 | 类型 | 描述 | 是否必须 |
|-|-|-|-|
| `type` | String | 必须为 `roaring` | 是 |
| `compressRunOnSerialization` | Boolean | 使用一个运行长度编码,可以更节省空间 | 否(默认为 `true` |
对于Concise位图
| 字段 | 类型 | 描述 | 是否必须 |
|-|-|-|-|
| `type` | String | 必须为 `concise` | 是 |
##### SegmentWriteOutMediumFactory
2020-04-20 05:09:08 -04:00
2020-04-26 05:11:18 -04:00
| 字段 | 类型 | 描述 | 是否必须 |
|-|-|-|-|
| `type` | String | 对于可用选项,可以见 [额外的Peon配置SegmentWriteOutMediumFactory](../Configuration/configuration.md#SegmentWriteOutMediumFactory) | 是 |
2020-04-20 05:09:08 -04:00
#### KafkaSupervisorIOConfig
2020-04-26 05:11:18 -04:00
| 字段 | 类型 | 描述 | 是否必须 |
|-|-|-|-|
| `topic` | String | 要读取数据的Kafka主题。这必须是一个特定的主题因为不支持主题模式 | 是 |
| `inputFormat` | Object | [`inputFormat`](dataformats.md#inputformat) 指定如何解析输入数据。 看 [下边部分](#指定输入数据格式) 查看指定输入格式的详细信息。 | 是 |
| `consumerProperties` | Map<String, Object> | 传给Kafka消费者的一组属性map。必须得包含 `bootstrap.servers` 的属性其值为Kafka Broker列表格式为: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`。 对于SSL连接`keystore`, `truststore``key` 密码可以被以一个字符串密码或者 [密码Provider](../Operations/passwordproviders.md) 来提供 | 是 |
| `pollTimeout` | Long | Kafka消费者拉取消息记录的超时等待时间毫秒单位 | 否默认为100|
| `replicas` | Integer | 副本的数量1意味着一个单一任务无副本。副本任务将始终分配给不同的worker以提供针对流程故障的恢复能力。| 否默认为1|
| `taskCount` | Integer | *一个副本集* 中*读取*任务的最大数量。 这意味着读取任务的最大的数量将是 `taskCount * replicas`, 任务总数(*读取 + 发布*)是大于这个数字的。 详情可以看下边的 [容量规划](#容量规划)。 如果 `taskCount > {numKafkaPartitions}`, 读取任务的数量会小于 `taskCount` | 否默认为1|
| `taskDuration` | ISO8601 Period | 任务停止读取数据、开始发布段之前的时间长度 | 否默认为PT1H|
| `startDelay` | ISO8601 Period | supervisor开始管理任务之前的等待时间 | 否默认为PT5S|
| `useEarliestOffset` | Boolean | 如果supervisor是第一次管理数据源它将从Kafka获得一组起始偏移。此标志确定它是检索Kafka中的最早偏移量还是最新偏移量。在正常情况下后续任务将从先前段结束的位置开始因此此标志将仅在首次运行时使用。 | 否默认false|
| `completionTimeout` | ISO8601 Period | 声明发布任务为失败并终止它 之前等待的时间长度。如果设置得太低,则任务可能永远不会发布。任务的发布时刻大约在 `taskDuration` (任务持续)时间过后开始。 | 否默认为PT30M|
| `lateMessageRejectionStartDateTime` | ISO8601 DateTime | 用来配置一个时间,当消息时间戳早于此日期时间的时候,消息被拒绝。 例如,如果该值设置为 `2016-01-01T11:00Z`, supervisor在 *`2016-01-01T12:00Z`* 创建了一个任务,时间戳早于 *2016-01-01T11:00Z* 的消息将会被丢弃。如果您的数据流有延迟消息,并且您有多个需要在同一段上操作的管道(例如实时和夜间批处理摄取管道),这可能有助于防止并发问题。 | 否默认为none|
| `lateMessageRejectionPeriod` | ISO8601 Period | 用来配置一个时间周期,当消息时间戳早于此周期的时候,消息被拒绝。例如,如果该值设置为 `PT1H`, supervisor 在 `2016-01-01T12:00Z` 创建了一个任务,则时间戳早于 `2016-01-01T11:00Z` 的消息将被丢弃。 如果您的数据流有延迟消息,并且您有多个需要在同一段上操作的管道(例如实时和夜间批处理摄取管道),这可能有助于防止并发问题。 **请特别注意**`lateMessageRejectionPeriod` 和 `lateMessageRejectionStartDateTime` 仅一个可以被指定。 | 否默认none|
| `earlyMessageRejectionPeriod` | ISO8601 Period | 用来配置一个时间周期,当消息时间戳晚于此周期的时候,消息被拒绝。 例如,如果该值设置为 `PT1H`,supervisor 在 `2016-01-01T12:00Z` 创建了一个任务,则时间戳晚于 `2016-01-01T14:00Z` 的消息将被丢弃。**注意**任务有时会超过其任务持续时间例如在supervisor故障转移的情况下。如果将 `earlyMessageRejectionPeriod` 设置得太低,则每当任务运行超过其最初配置的任务持续时间时,可能会导致消息意外丢弃。| 否默认none|
##### 指定输入数据格式
2020-04-19 23:15:56 -04:00
### 操作
#### 获取supervisor的状态报告
#### 获取supervisor摄取状态报告
#### supervisor健康检测
#### 更新现有的supervisor
#### 暂停和恢复supervisors
#### 重置supervisors
#### 终止supervisors
#### 容量规划
#### supervisor持久化
#### schema/配置变更
#### 部署注意