9.5 KiB
本地批摄入
Apache Druid当前支持两种类型的本地批量索引任务, index_parallel
可以并行的运行多个任务, index
运行单个索引任务。 详情可以查看 基于Hadoop的摄取vs基于本地批摄取的对比 来了解基于Hadoop的摄取、本地简单批摄取、本地并行摄取三者的比较。
要运行这两种类型的本地批索引任务,请按以下指定编写摄取规范。然后将其发布到Overlord的 /druid/indexer/v1/task
接口,或者使用druid附带的 bin/post-index-task
。
教程
此页包含本地批处理摄取的参考文档。相反,如果要进行演示,请查看 加载文件教程,该教程演示了"简单"(单任务)模式
并行任务
并行任务(index_parallel
类型)是用于并行批索引的任务。此任务只使用Druid的资源,不依赖于其他外部系统,如Hadoop。index_parallel
任务是一个supervisor任务,它协调整个索引过程。supervisor分割输入数据并创建辅助任务来处理这些分割, 创建的worker将发布给Overlord,以便在MiddleManager或Indexer上安排和运行。一旦worker成功处理分配的输入拆分,它就会将生成的段列表报告给supervisor任务。supervisor定期检查工作任务的状态。如果其中一个失败,它将重试失败的任务,直到重试次数达到配置的限制。如果所有工作任务都成功,它将立即发布报告的段并完成摄取。
并行任务的详细行为是不同的,取决于 partitionsSpec
,详情可以查看 partitionsSpec
要使用此任务,ioConfig
中的 inputSource
应为splittable(可拆分的),tuningConfig
中的 maxNumConcurrentSubTasks
应设置为大于1。否则,此任务将按顺序运行;index_parallel
任务将逐个读取每个输入文件并自行创建段。目前支持的可拆分输入格式有:
s3
从AWS S3存储读取数据gs
从谷歌云存储读取数据azure
从Azure Blob存储读取数据hdfs
从HDFS存储中读取数据http
从HTTP服务中读取数据local
从本地存储中读取数据druid
从Druid数据源中读取数据
传统的 firehose
支持其他一些云存储类型。下面的 firehose
类型也是可拆分的。请注意,firehose
只支持文本格式。
您可能需要考虑以下事项:
- 您可能希望控制每个worker进程的输入数据量。这可以使用不同的配置进行控制,具体取决于并行摄取的阶段(有关更多详细信息,请参阅
partitionsSpec
。对于从inputSource
读取数据的任务,可以在tuningConfig
中设置 分割提示规范。对于合并无序段的任务,可以在tuningConfig
中设置totalNumMergeTasks
。 - 并行摄取中并发worker的数量由
tuningConfig
中的maxNumConcurrentSubTasks
确定。supervisor检查当前正在运行的worker的数量,如果小于maxNumConcurrentSubTasks
,则无论当前有多少任务槽可用,都会创建更多的worker。这可能会影响其他摄取性能。有关更多详细信息,请参阅下面的 容量规划部分。 - 默认情况下,批量摄取将替换它写入的任何段中的所有数据(在
granularitySpec
的间隔中)。如果您想添加到段中,请在ioConfig
中设置appendToExisting
标志。请注意,它只替换主动添加数据的段中的数据:如果granularitySpec
的间隔中有段没有此任务写入的数据,则它们将被单独保留。如果任何现有段与granularitySpec
的间隔部分重叠,则新段间隔之外的那些段的部分仍将可见。
任务符号
一个简易的任务如下所示:
{
"type": "index_parallel",
"spec": {
"dataSchema": {
"dataSource": "wikipedia_parallel_index_test",
"timestampSpec": {
"column": "timestamp"
},
"dimensionsSpec": {
"dimensions": [
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city"
]
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"segmentGranularity": "DAY",
"queryGranularity": "second",
"intervals" : [ "2013-08-31/2013-09-02" ]
}
},
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "local",
"baseDir": "examples/indexing/",
"filter": "wikipedia_index_data*"
},
"inputFormat": {
"type": "json"
}
},
"tuningconfig": {
"type": "index_parallel",
"maxNumConcurrentSubTasks": 2
}
}
}
属性 | 描述 | 是否必须 |
---|---|---|
type |
任务类型,应当总是 index_parallel |
是 |
id |
任务ID。 如果该项没有显式的指定,Druid将使用任务类型、数据源名称、时间间隔、日期时间戳生成一个任务ID | 否 |
spec |
摄取规范包括数据schema、IOConfig 和 TuningConfig。详情见下边详细描述 | 是 |
context |
Context包括了多个任务配置参数。详情见下边详细描述 | 否 |
dataSchema
该字段为必须字段。
可以参见 摄取规范中的dataSchema
如果在dataSchema的 granularitySpec
中显式地指定了 intervals
,则批处理摄取将锁定启动时指定的完整间隔,并且您将快速了解指定间隔是否与其他任务(例如Kafka摄取)持有的锁重叠。否则,在发现每个间隔时,批处理摄取将锁定该间隔,因此您可能只会在摄取过程中了解到该任务与较高优先级的任务重叠。如果显式指定 intervals
,则指定间隔之外的任何行都将被丢弃。如果您知道数据的时间范围,我们建议显式地设置intervals
,以便锁定失败发生得更快,并且如果有一些带有意外时间戳的杂散数据,您不会意外地替换该范围之外的数据。
ioConfig
属性 | 描述 | 默认 | 是否必须 |
---|---|---|---|
type |
任务类型, 应当总是 index_parallel |
none | 是 |
inputFormat |
inputFormat 用来指定如何解析输入数据 |
none | 是 |
appendToExisting |
创建段作为最新版本的附加分片,有效地附加到段集而不是替换它。仅当现有段集具有可扩展类型 shardSpec 时,此操作才有效。 |
false | 否 |
tuningConfig
tuningConfig是一个可选项,如果未指定则使用默认的参数。 详情如下:
属性 | 描述 | 默认 | 是否必须 |
---|---|---|---|
type |
任务类型,应当总是 index_parallel |
none | 是 |
maxRowsPerSegment |
已废弃。使用 partitionsSpec 替代,被用来分片。 决定在每个段中有多少行。 |
5000000 | 否 |
maxRowsInMemory |
用于确定何时应该从中间层持久化到磁盘。通常用户不需要设置此值,但根据数据的性质,如果行的字节数较短,则用户可能不希望在内存中存储一百万行,应设置此值。 | 1000000 | 否 |
maxBytesInMemory |
用于确定何时应该从中间层持久化到磁盘。通常这是在内部计算的,用户不需要设置它。此值表示在持久化之前要在堆内存中聚合的字节数。这是基于对内存使用量的粗略估计,而不是实际使用量。用于索引的最大堆内存使用量为 maxBytesInMemory *(2 + maxPendingResistent) |
最大JVM内存的1/6 | 否 |
maxTotalRows |
已废弃。使用 partitionsSpec 替代。等待推送的段中的总行数。用于确定何时应进行中间推送。 |
20000000 | 否 |
numShards |
已废弃。使用 partitionsSpec 替代。当使用 hashed partitionsSpec 时直接指定要创建的分片数。如果该值被指定了且在 granularitySpec 中指定了 intervals ,那么索引任务可以跳过确定间隔/分区传递数据。如果设置了 maxRowsPerSegment ,则无法指定 numShards 。 |
null | 否 |