From b22f19063a2b82f34660ca7db35699f58083b263 Mon Sep 17 00:00:00 2001 From: liujianhuan Date: Mon, 13 Apr 2020 16:25:03 +0800 Subject: [PATCH] dataformat part-6 --- DataIngestion/dataformats.md | 299 +++++++++++++++++++++++++++++++++++ 1 file changed, 299 insertions(+) diff --git a/DataIngestion/dataformats.md b/DataIngestion/dataformats.md index 8410846..4b2d9ec 100644 --- a/DataIngestion/dataformats.md +++ b/DataIngestion/dataformats.md @@ -550,8 +550,307 @@ Avro parseSpec可以包含使用"root"或"path"字段类型的 [flattenSpec](#fl ``` #### Parquet Hadoop Parser + +> [!WARNING] +> 需要添加 [druid-parquet-extensions](../Development/parquet-extensions.md) 来使用Parquet Hadoop解析器 + +该解析器用于 [Hadoop批摄取](hadoopbased.md)。在 `ioConfig` 中,`inputSpec` 中的 `inputFormat` 必须设置为 `org.apache.druid.data.input.parquet.DruidParquetInputFormat`。 + +Parquet Hadoop 解析器支持自动字段发现,如果提供了一个带有 `parquet` `parquetSpec`的 `flattenSpec` 也支持展平。 Parquet嵌套 list 和 map [逻辑类型](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md) 应与所有受支持类型的JSON path表达式一起正确操作。 + +| 字段 | 类型 | 描述 | 是否必填 | +|-|-|-|-| +| type | String | 应该填 `parquet` | 是 | +| parseSpec | JSON对象 | 指定数据的时间戳和维度和一个可选的 `flattenSpec`。有效的 `parseSpec` 格式是 `timeAndDims` 和 `parquet` | 是 | +| binaryAsString | 布尔类型 | 指定逻辑上未标记为字符串的二进制orc列是否应被视为UTF-8编码字符串。 | 否(默认为false) | + +当时间维度是一个 [date类型的列](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md), 则无需指定一个格式。 当格式为UTF8的String, 则要么指定为 `auto`,或者显式的指定一个 [时间格式](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html)。 + +**Parquet Hadoop解析器 vs Parquet Avro Hadoop解析器** +两者都是从Parquet文件中读取,但是又轻微的不同。主要不同之处是: +* Parquet Hadoop解析器使用简单的转换,而Parquet Avro Hadoop解析器首先使用 `parquet-avro` 库将Parquet数据转换为Avro记录,然后使用 `druid-avro-extensions` 模块将Avro数据解析为druid +* Parquet Hadoop解析器将Hadoop作业属性 `parquet.avro.add-list-element-records` 设置为false(通常默认为true),以便将原始列表元素"展开"为多值维度 +* Parquet Hadoop解析器支持 `int96` Parquet值,而 Parquet Avro Hadoop解析器不支持。`flatteSpec` 的JSON path表达式求值的行为也可能存在一些细微的差异 + +基于这些差异,我们建议在Parquet avro hadoop解析器上使用Parquet Hadoop解析器,以允许摄取超出Avro转换模式约束的数据。然而,Parquet Avro Hadoop解析器是支持Parquet格式的原始基础,因此它更加成熟。 + +**示例** + +`parquet` parser, `parquet` parseSpec +``` +{ + "type": "index_hadoop", + "spec": { + "ioConfig": { + "type": "hadoop", + "inputSpec": { + "type": "static", + "inputFormat": "org.apache.druid.data.input.parquet.DruidParquetInputFormat", + "paths": "path/to/file.parquet" + }, + ... + }, + "dataSchema": { + "dataSource": "example", + "parser": { + "type": "parquet", + "parseSpec": { + "format": "parquet", + "flattenSpec": { + "useFieldDiscovery": true, + "fields": [ + { + "type": "path", + "name": "nestedDim", + "expr": "$.nestedData.dim1" + }, + { + "type": "path", + "name": "listDimFirstItem", + "expr": "$.listDim[1]" + } + ] + }, + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": [], + "dimensionExclusions": [], + "spatialDimensions": [] + } + } + }, + ... + }, + "tuningConfig": + } + } +} +``` +`parquet` parser, `timeAndDims` parseSpec +``` +{ + "type": "index_hadoop", + "spec": { + "ioConfig": { + "type": "hadoop", + "inputSpec": { + "type": "static", + "inputFormat": "org.apache.druid.data.input.parquet.DruidParquetInputFormat", + "paths": "path/to/file.parquet" + }, + ... + }, + "dataSchema": { + "dataSource": "example", + "parser": { + "type": "parquet", + "parseSpec": { + "format": "timeAndDims", + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": [ + "dim1", + "dim2", + "dim3", + "listDim" + ], + "dimensionExclusions": [], + "spatialDimensions": [] + } + } + }, + ... + }, + "tuningConfig": + } +} +``` + #### Parquet Avro Hadoop Parser + +> [!WARNING] +> 考虑在该解析器之上使用 [Parquet Hadoop Parser](#parquet-hadoop-parser) 来摄取Parquet文件。 两者之间的不同之处参见 [Parquet Hadoop解析器 vs Parquet Avro Hadoop解析器]() 部分 + +> [!WARNING] +> 使用Parquet Avro Hadoop Parser需要同时加入 [druid-parquet-extensions](../Development/parquet-extensions.md) 和 [druid-avro-extensions](../Development/avro-extensions.md) + +该解析器用于 [Hadoop批摄取](hadoopbased.md), 该解析器首先将Parquet数据转换为Avro记录,然后再解析它们后摄入到Druid。在 `ioConfig` 中,`inputSpec` 中的 `inputFormat` 必须设置为 `org.apache.druid.data.input.parquet.DruidParquetAvroInputFormat`。 + +Parquet Avro Hadoop 解析器支持自动字段发现,如果提供了一个带有 `avro` `parquetSpec`的 `flattenSpec` 也支持展平。 Parquet嵌套 list 和 map [逻辑类型](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md) 应与所有受支持类型的JSON path表达式一起正确操作。该解析器将Hadoop作业属性 `parquet.avro.add-list-element-records` 设置为false(通常默认为true),以便将原始列表元素"展开"为多值维度。 + +注意,`int96` Parquet值类型在该解析器中是不支持的。 + +| 字段 | 类型 | 描述 | 是否必填 | +|-|-|-|-| +| type | String | 应该填 `parquet-avro` | 是 | +| parseSpec | JSON对象 | 指定数据的时间戳和维度和一个可选的 `flattenSpec`, 应该是 `avro` | 是 | +| binaryAsString | 布尔类型 | 指定逻辑上未标记为字符串的二进制orc列是否应被视为UTF-8编码字符串。 | 否(默认为false) | + +当时间维度是一个 [date类型的列](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md), 则无需指定一个格式。 当格式为UTF8的String, 则要么指定为 `auto`,或者显式的指定一个 [时间格式](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html)。 + +**示例** +``` +{ + "type": "index_hadoop", + "spec": { + "ioConfig": { + "type": "hadoop", + "inputSpec": { + "type": "static", + "inputFormat": "org.apache.druid.data.input.parquet.DruidParquetAvroInputFormat", + "paths": "path/to/file.parquet" + }, + ... + }, + "dataSchema": { + "dataSource": "example", + "parser": { + "type": "parquet-avro", + "parseSpec": { + "format": "avro", + "flattenSpec": { + "useFieldDiscovery": true, + "fields": [ + { + "type": "path", + "name": "nestedDim", + "expr": "$.nestedData.dim1" + }, + { + "type": "path", + "name": "listDimFirstItem", + "expr": "$.listDim[1]" + } + ] + }, + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": [], + "dimensionExclusions": [], + "spatialDimensions": [] + } + } + }, + ... + }, + "tuningConfig": + } + } +} +``` + #### Avro Stream Parser + +> [!WARNING] +> 需要添加 [druid-avro-extensions](../Development/avro-extensions.md) 来使用Avro Stream解析器 + +该解析器用于 [流式摄取](streamingest.md), 直接从一个流来读取数据。 + +| 字段 | 类型 | 描述 | 是否必须 | +|-|-|-|-| +| type | String | `avro_stream` | 否 | +| avroBytesDecoder | JSON对象 | 指定如何对Avro记录进行解码 | 是 | +| parseSpec | JSON对象 | 指定数据的时间戳和维度。 应该是一个 `avro` parseSpec | 是 | + +Avro parseSpec包含一个使用"root"或者"path"类型的 [`flattenSpec`](ingestion.md#flattenspec.md), 以便可以用来读取嵌套的avro数据。 "jq"类型在Avro中目前还不支持。 + +以下示例展示了一个具有**schema repo**avro解码器的 `Avro stream parser`: +``` +"parser" : { + "type" : "avro_stream", + "avroBytesDecoder" : { + "type" : "schema_repo", + "subjectAndIdConverter" : { + "type" : "avro_1124", + "topic" : "${YOUR_TOPIC}" + }, + "schemaRepository" : { + "type" : "avro_1124_rest_client", + "url" : "${YOUR_SCHEMA_REPO_END_POINT}", + } + }, + "parseSpec" : { + "format": "avro", + "timestampSpec": , + "dimensionsSpec": , + "flattenSpec": + } +} +``` + +**Avro Bytes Decoder** + +如果 `type` 未被指定, `avroBytesDecoder` 默认使用 `schema_repo`。 + +**基于Avro Bytes Decoder的 `inline schema`** + +> [!WARNING] +> "schema_inline"解码器使用固定schema读取Avro记录,不支持schema迁移。如果将来可能需要迁移schema,请考虑其他解码器之一,所有解码器都使用一个消息头,该消息头允许解析器识别正确的Avro schema以读取记录。 + +如果可以使用同一schema读取所有输入事件,则可以使用此解码器。在这种情况下,在输入任务JSON本身中指定schema,如下所述: +``` +... +"avroBytesDecoder": { + "type": "schema_inline", + "schema": { + //your schema goes here, for example + "namespace": "org.apache.druid.data", + "name": "User", + "type": "record", + "fields": [ + { "name": "FullName", "type": "string" }, + { "name": "Country", "type": "string" } + ] + } +} +... +``` +**基于Avro Bytes Decoder的 `multiple inline schemas`** + +如果不同的输入事件可以有不同的读取schema,请使用此解码器。在这种情况下,在输入任务JSON本身中指定schema,如下所述: +``` +... +"avroBytesDecoder": { + "type": "multiple_schemas_inline", + "schemas": { + //your id -> schema map goes here, for example + "1": { + "namespace": "org.apache.druid.data", + "name": "User", + "type": "record", + "fields": [ + { "name": "FullName", "type": "string" }, + { "name": "Country", "type": "string" } + ] + }, + "2": { + "namespace": "org.apache.druid.otherdata", + "name": "UserIdentity", + "type": "record", + "fields": [ + { "name": "Name", "type": "string" }, + { "name": "Location", "type": "string" } + ] + }, + ... + ... + } +} +... +``` +注意,它本质上是一个整数Schema ID到avro schema对象的映射。此解析器假定记录具有以下格式。第一个1字节是版本,必须始终为1, 接下来的4个字节是使用大端字节顺序序列化的整数模式ID。其余字节包含序列化的avro消息。 + +**基于Avro Bytes Decoder的 `SchemaRepo`** + #### Protobuf Parser ### ParseSpec #### JSON解析规范