dataformat part-6

This commit is contained in:
liujianhuan 2020-04-13 16:25:03 +08:00
parent dbca0c46f8
commit b22f19063a
1 changed files with 299 additions and 0 deletions

View File

@ -550,8 +550,307 @@ Avro parseSpec可以包含使用"root"或"path"字段类型的 [flattenSpec](#fl
```
#### Parquet Hadoop Parser
> [!WARNING]
> 需要添加 [druid-parquet-extensions](../Development/parquet-extensions.md) 来使用Parquet Hadoop解析器
该解析器用于 [Hadoop批摄取](hadoopbased.md)。在 `ioConfig` 中,`inputSpec` 中的 `inputFormat` 必须设置为 `org.apache.druid.data.input.parquet.DruidParquetInputFormat`
Parquet Hadoop 解析器支持自动字段发现,如果提供了一个带有 `parquet` `parquetSpec``flattenSpec` 也支持展平。 Parquet嵌套 list 和 map [逻辑类型](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md) 应与所有受支持类型的JSON path表达式一起正确操作。
| 字段 | 类型 | 描述 | 是否必填 |
|-|-|-|-|
| type | String | 应该填 `parquet` | 是 |
| parseSpec | JSON对象 | 指定数据的时间戳和维度和一个可选的 `flattenSpec`。有效的 `parseSpec` 格式是 `timeAndDims``parquet` | 是 |
| binaryAsString | 布尔类型 | 指定逻辑上未标记为字符串的二进制orc列是否应被视为UTF-8编码字符串。 | 否默认为false |
当时间维度是一个 [date类型的列](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md), 则无需指定一个格式。 当格式为UTF8的String 则要么指定为 `auto`,或者显式的指定一个 [时间格式](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html)。
**Parquet Hadoop解析器 vs Parquet Avro Hadoop解析器**
两者都是从Parquet文件中读取但是又轻微的不同。主要不同之处是
* Parquet Hadoop解析器使用简单的转换而Parquet Avro Hadoop解析器首先使用 `parquet-avro` 库将Parquet数据转换为Avro记录然后使用 `druid-avro-extensions` 模块将Avro数据解析为druid
* Parquet Hadoop解析器将Hadoop作业属性 `parquet.avro.add-list-element-records` 设置为false通常默认为true以便将原始列表元素"展开"为多值维度
* Parquet Hadoop解析器支持 `int96` Parquet值而 Parquet Avro Hadoop解析器不支持。`flatteSpec` 的JSON path表达式求值的行为也可能存在一些细微的差异
基于这些差异我们建议在Parquet avro hadoop解析器上使用Parquet Hadoop解析器以允许摄取超出Avro转换模式约束的数据。然而Parquet Avro Hadoop解析器是支持Parquet格式的原始基础因此它更加成熟。
**示例**
`parquet` parser, `parquet` parseSpec
```
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.druid.data.input.parquet.DruidParquetInputFormat",
"paths": "path/to/file.parquet"
},
...
},
"dataSchema": {
"dataSource": "example",
"parser": {
"type": "parquet",
"parseSpec": {
"format": "parquet",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "path",
"name": "nestedDim",
"expr": "$.nestedData.dim1"
},
{
"type": "path",
"name": "listDimFirstItem",
"expr": "$.listDim[1]"
}
]
},
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
...
},
"tuningConfig": <hadoop-tuning-config>
}
}
}
```
`parquet` parser, `timeAndDims` parseSpec
```
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.druid.data.input.parquet.DruidParquetInputFormat",
"paths": "path/to/file.parquet"
},
...
},
"dataSchema": {
"dataSource": "example",
"parser": {
"type": "parquet",
"parseSpec": {
"format": "timeAndDims",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"dim1",
"dim2",
"dim3",
"listDim"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
...
},
"tuningConfig": <hadoop-tuning-config>
}
}
```
#### Parquet Avro Hadoop Parser
> [!WARNING]
> 考虑在该解析器之上使用 [Parquet Hadoop Parser](#parquet-hadoop-parser) 来摄取Parquet文件。 两者之间的不同之处参见 [Parquet Hadoop解析器 vs Parquet Avro Hadoop解析器]() 部分
> [!WARNING]
> 使用Parquet Avro Hadoop Parser需要同时加入 [druid-parquet-extensions](../Development/parquet-extensions.md) 和 [druid-avro-extensions](../Development/avro-extensions.md)
该解析器用于 [Hadoop批摄取](hadoopbased.md), 该解析器首先将Parquet数据转换为Avro记录然后再解析它们后摄入到Druid。在 `ioConfig` 中,`inputSpec` 中的 `inputFormat` 必须设置为 `org.apache.druid.data.input.parquet.DruidParquetAvroInputFormat`
Parquet Avro Hadoop 解析器支持自动字段发现,如果提供了一个带有 `avro` `parquetSpec``flattenSpec` 也支持展平。 Parquet嵌套 list 和 map [逻辑类型](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md) 应与所有受支持类型的JSON path表达式一起正确操作。该解析器将Hadoop作业属性 `parquet.avro.add-list-element-records` 设置为false通常默认为true以便将原始列表元素"展开"为多值维度。
注意,`int96` Parquet值类型在该解析器中是不支持的。
| 字段 | 类型 | 描述 | 是否必填 |
|-|-|-|-|
| type | String | 应该填 `parquet-avro` | 是 |
| parseSpec | JSON对象 | 指定数据的时间戳和维度和一个可选的 `flattenSpec`, 应该是 `avro` | 是 |
| binaryAsString | 布尔类型 | 指定逻辑上未标记为字符串的二进制orc列是否应被视为UTF-8编码字符串。 | 否默认为false |
当时间维度是一个 [date类型的列](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md), 则无需指定一个格式。 当格式为UTF8的String 则要么指定为 `auto`,或者显式的指定一个 [时间格式](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html)。
**示例**
```
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.druid.data.input.parquet.DruidParquetAvroInputFormat",
"paths": "path/to/file.parquet"
},
...
},
"dataSchema": {
"dataSource": "example",
"parser": {
"type": "parquet-avro",
"parseSpec": {
"format": "avro",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "path",
"name": "nestedDim",
"expr": "$.nestedData.dim1"
},
{
"type": "path",
"name": "listDimFirstItem",
"expr": "$.listDim[1]"
}
]
},
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
...
},
"tuningConfig": <hadoop-tuning-config>
}
}
}
```
#### Avro Stream Parser
> [!WARNING]
> 需要添加 [druid-avro-extensions](../Development/avro-extensions.md) 来使用Avro Stream解析器
该解析器用于 [流式摄取](streamingest.md), 直接从一个流来读取数据。
| 字段 | 类型 | 描述 | 是否必须 |
|-|-|-|-|
| type | String | `avro_stream` | 否 |
| avroBytesDecoder | JSON对象 | 指定如何对Avro记录进行解码 | 是 |
| parseSpec | JSON对象 | 指定数据的时间戳和维度。 应该是一个 `avro` parseSpec | 是 |
Avro parseSpec包含一个使用"root"或者"path"类型的 [`flattenSpec`](ingestion.md#flattenspec.md), 以便可以用来读取嵌套的avro数据。 "jq"类型在Avro中目前还不支持。
以下示例展示了一个具有**schema repo**avro解码器的 `Avro stream parser`:
```
"parser" : {
"type" : "avro_stream",
"avroBytesDecoder" : {
"type" : "schema_repo",
"subjectAndIdConverter" : {
"type" : "avro_1124",
"topic" : "${YOUR_TOPIC}"
},
"schemaRepository" : {
"type" : "avro_1124_rest_client",
"url" : "${YOUR_SCHEMA_REPO_END_POINT}",
}
},
"parseSpec" : {
"format": "avro",
"timestampSpec": <standard timestampSpec>,
"dimensionsSpec": <standard dimensionsSpec>,
"flattenSpec": <optional>
}
}
```
**Avro Bytes Decoder**
如果 `type` 未被指定, `avroBytesDecoder` 默认使用 `schema_repo`
**基于Avro Bytes Decoder的 `inline schema`**
> [!WARNING]
> "schema_inline"解码器使用固定schema读取Avro记录不支持schema迁移。如果将来可能需要迁移schema请考虑其他解码器之一所有解码器都使用一个消息头该消息头允许解析器识别正确的Avro schema以读取记录。
如果可以使用同一schema读取所有输入事件则可以使用此解码器。在这种情况下在输入任务JSON本身中指定schema如下所述:
```
...
"avroBytesDecoder": {
"type": "schema_inline",
"schema": {
//your schema goes here, for example
"namespace": "org.apache.druid.data",
"name": "User",
"type": "record",
"fields": [
{ "name": "FullName", "type": "string" },
{ "name": "Country", "type": "string" }
]
}
}
...
```
**基于Avro Bytes Decoder的 `multiple inline schemas`**
如果不同的输入事件可以有不同的读取schema请使用此解码器。在这种情况下在输入任务JSON本身中指定schema如下所述:
```
...
"avroBytesDecoder": {
"type": "multiple_schemas_inline",
"schemas": {
//your id -> schema map goes here, for example
"1": {
"namespace": "org.apache.druid.data",
"name": "User",
"type": "record",
"fields": [
{ "name": "FullName", "type": "string" },
{ "name": "Country", "type": "string" }
]
},
"2": {
"namespace": "org.apache.druid.otherdata",
"name": "UserIdentity",
"type": "record",
"fields": [
{ "name": "Name", "type": "string" },
{ "name": "Location", "type": "string" }
]
},
...
...
}
}
...
```
注意它本质上是一个整数Schema ID到avro schema对象的映射。此解析器假定记录具有以下格式。第一个1字节是版本必须始终为1, 接下来的4个字节是使用大端字节顺序序列化的整数模式ID。其余字节包含序列化的avro消息。
**基于Avro Bytes Decoder的 `SchemaRepo`**
#### Protobuf Parser
### ParseSpec
#### JSON解析规范