2020-04-23 15:31:44 +08:00

6.0 KiB
Raw Blame History

Apache Kafka 摄取数据

Kafka索引服务支持在Overlord上配置supervisorssupervisors通过管理Kafka索引任务的创建和生存期来便于从Kafka摄取数据。这些索引任务使用Kafka自己的分区和偏移机制读取事件因此能够保证只接收一次exactly-once。supervisor监视索引任务的状态以便于协调切换、管理故障并确保维护可伸缩性和复制要求。

这个服务由 druid-kafka-indexing-service 这个druid核心扩展详情请见 [扩展列表](../Development/extensions.md所提供。

Warning

Kafka索引服务支持在Kafka 0.11.x中引入的事务主题。这些更改使Druid使用的Kafka消费者与旧的brokers不兼容。在使用此功能之前请确保您的Kafka broker版本为0.11.x或更高版本。如果您使用的是旧版本的Kafka brokers请参阅《Kafka升级指南》。

教程

本页包含基于Apache Kafka的摄取的参考文档。同样您可以查看 Apache Kafka教程 中的加载。

提交一个supervisor规范

Kafka索引服务需要同时在Overlord和MiddleManagers中加载 druid-kafka-indexing-service 扩展。 用于一个数据源的supervisor通过向 http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor 发送一个HTTP POST请求来启动例如

curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json http://localhost:8090/druid/indexer/v1/supervisor

一个示例supervisor规范如下

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "metrics-kafka",
    "timestampSpec": {
      "column": "timestamp",
      "format": "auto"
    },
    "dimensionsSpec": {
      "dimensions": [],
      "dimensionExclusions": [
        "timestamp",
        "value"
      ]
    },
    "metricsSpec": [
      {
        "name": "count",
        "type": "count"
      },
      {
        "name": "value_sum",
        "fieldName": "value",
        "type": "doubleSum"
      },
      {
        "name": "value_min",
        "fieldName": "value",
        "type": "doubleMin"
      },
      {
        "name": "value_max",
        "fieldName": "value",
        "type": "doubleMax"
      }
    ],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "HOUR",
      "queryGranularity": "NONE"
    }
  },
  "tuningConfig": {
    "type": "kafka",
    "maxRowsPerSegment": 5000000
  },
  "ioConfig": {
    "topic": "metrics",
    "inputFormat": {
      "type": "json"
    },
    "consumerProperties": {
      "bootstrap.servers": "localhost:9092"
    },
    "taskCount": 1,
    "replicas": 1,
    "taskDuration": "PT1H"
  }
}

supervisor配置

字段 描述 是否必须
type supervisor类型 总是 kafka
dataSchema Kafka索引服务在摄取时使用的schema。详情见 dataSchema
ioConfig 用于配置supervisor和索引任务的KafkaSupervisorIOConfig详情见以下
tuningConfig 用于配置supervisor和索引任务的KafkaSupervisorTuningConfig详情见以下

KafkaSupervisorTuningConfig

tuningConfig 是可选的, 如果未被配置的话,则使用默认的参数。

字段 类型 描述 是否必须
type String 索引任务类型, 总是 kafka
maxRowsInMemory Integer 在持久化之前在内存中聚合的最大行数。该数值为聚合之后的行数,所以它不等于原始输入事件的行数,而是事件被聚合后的行数。 通常用来管理所需的JVM堆内存。 使用 maxRowsInMemory * (2 + maxPendingPersists) 来当做索引任务的最大堆内存。通常用户不需要设置这个值,但是也需要根据数据的特点来决定,如果行的字节数较短,用户可能不想在内存中存储一百万行,应该设置这个值 否(默认为 1000000
maxBytesInMemory Long 在持久化之前在内存中聚合的最大字节数。这是基于对内存使用量的粗略估计,而不是实际使用量。通常这是在内部计算的,用户不需要设置它。 索引任务的最大内存使用量是 maxRowsInMemory * (2 + maxPendingPersists) 默认为最大JVM内存的 1/6
maxRowsPerSegment Integer 聚合到一个段中的行数,该数值为聚合后的数值。 当 maxRowsPerSegment 或者 maxTotalRows 有一个值命中的时候则触发handoff数据落盘后传到深度存储 该动作也会按照每 intermediateHandoffPeriod 时间间隔发生一次。 默认为5000000
maxTotalRows Long 所有段的聚合后的行数,该值为聚合后的行数。当 maxRowsPerSegment 或者 maxTotalRows 有一个值命中的时候则触发handoff数据落盘后传到深度存储 该动作也会按照每 intermediateHandoffPeriod 时间间隔发生一次。 默认为unlimited
intermediateHandoffPeriod ISO8601 Period 确定触发持续化存储的周期 否(默认为 PT10M
maxPendingPersists Integer 正在等待但启动的持久化过程的最大数量。 如果新的持久化任务超过了此限制,则在当前运行的持久化完成之前,摄取将被阻止。索引任务的最大内存使用量是 maxRowsInMemory * (2 + maxPendingPersists) 默认为0意味着一个持久化可以与摄取同时运行而没有一个可以排队
indexSpec Object 调整数据被如何索引。详情可以见 indexSpec
indexSpecForIntermediatePersists 否(默认与 indexSpec 相同)

KafkaSupervisorIOConfig

操作

获取supervisor的状态报告

获取supervisor摄取状态报告

supervisor健康检测

更新现有的supervisor

暂停和恢复supervisors

重置supervisors

终止supervisors

容量规划

supervisor持久化

schema/配置变更

部署注意