Docs: updating Kafka input format docs (#14049)

* updating Kafka input format docs

* typo

* spellcheck

* Update docs/development/extensions-core/kafka-ingestion.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/development/extensions-core/kafka-ingestion.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/development/extensions-core/kafka-ingestion.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/development/extensions-core/kafka-ingestion.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/development/extensions-core/kafka-ingestion.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/development/extensions-core/kafka-ingestion.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/development/extensions-core/kafka-ingestion.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/ingestion/data-formats.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/ingestion/data-formats.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/ingestion/data-formats.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/ingestion/data-formats.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/ingestion/data-formats.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Update docs/ingestion/data-formats.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

---------

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>
This commit is contained in:
Vadim Ogievetsky 2023-04-11 20:06:23 -07:00 committed by GitHub
parent 29652bd246
commit 3a7e4efdd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 180 additions and 143 deletions

View File

@ -63,6 +63,7 @@ For a full description of all the fields and parameters in a Kafka supervisor sp
The following sections contain examples to help you get started with supervisor specs.
### JSON input format supervisor spec example
The following example demonstrates a supervisor spec for Kafka that uses the `JSON` input format. In this case Druid parses the event contents in JSON format:
```json
@ -131,40 +132,83 @@ The following example demonstrates a supervisor spec for Kafka that uses the `JS
### Kafka input format supervisor spec example
If you want to ingest data from other fields in addition to the Kafka message contents, you can use the `kafka` input format. The `kafka` input format lets you ingest:
- the event key field
- event headers
- the Kafka event timestamp
- the Kafka event value that stores the payload.
If you want to parse the Kafka metadata fields in addition to the Kafka payload value contents, you can use the `kafka` input format.
For example, consider the following structure for a message that represents a fictitious wiki edit in a development environment:
- **Event headers**: {"environment": "development"}
- **Event key**: {"key: "wiki-edit"}
- **Event value**: \<JSON object with event payload containing the change details\>
- **Event timestamp**: "Nov. 10, 2021 at 14:06"
The `kafka` input format wraps around the payload parsing input format and augments the data it outputs with the Kafka event timestamp,
the Kafka event headers, and the key field that itself can be parsed using any available InputFormat.
When you use the `kafka` input format, you configure the way that Druid names the dimensions created from the Kafka message:
- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any conflicts with named dimensions. The default is `kafka.header`. Considering the header from the example, Druid maps the header to the following column: `kafka.header.environment`.
- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the Druid schema to avoid conflicts with other time columns. The default is `kafka.timestamp`.
- `keyColumnName`: Supply the name for the Kafka key column in Druid. The default is `kafka.key`.
Additionally, you must provide information about how Druid should parse the data in the Kafka message:
For example, consider the following structure for a Kafka message that represents a fictitious wiki edit in a development environment:
- **Kafka timestamp**: `1680795276351`
- **Kafka headers**:
- `env=development`
- `zone=z1`
- **Kafka key**: `wiki-edit`
- **Kafka payload value**: `{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo Toraut","delta":31,"namespace":"Main"}`
Using `{ "type": "json" }` as the input format would only parse the payload value.
To parse the Kafka metadata in addition to the payload, use the `kafka` input format.
You would configure it as follows:
- `valueFormat`: Define how to parse the payload value. Set this to the payload parsing input format (`{ "type": "json" }`).
- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the Druid schema to avoid conflicts with columns from the payload. The default is `kafka.timestamp`.
- `headerFormat`: The default "string" decodes UTF8-encoded strings from the Kafka header. If you need another format, you can implement your own parser.
- `keyFormat`: Takes a Druid `inputFormat` and uses the value for the first key it finds. According to the example the value is "wiki-edit". It discards the key name in this case. If you store the key as a string, use the `CSV` input format. For example, if you have simple string for the the key `wiki-edit`, you can use the following to parse the key:
- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any conflicts with columns from the payload. The default is `kafka.header.`.
Considering the header from the example, Druid maps the headers to the following columns: `kafka.header.env`, `kafka.header.zone`.
- `keyFormat`: Supply an input format to parse the key. Only the first value will be used.
If, as in the example, your key values are simple strings, then you can use the `tsv` format to parse them.
```
"keyFormat": {
"type": "csv",
"hasHeaderRow": false,
{
"type": "tsv",
"findColumnsFromHeader": false,
"columns": ["key"]
}
"columns": ["x"]
}
```
- `valueFormat`: Define how to parse the message contents. You can use any of the Druid input formats that work for Kafka.
Note that for `tsv`,`csv`, and `regex` formats, you need to provide a `columns` array to make a valid input format. Only the first one is used, and its name will be ignored in favor of `keyColumnName`.
- `keyColumnName`: Supply the name for the Kafka key column to avoid conflicts with columns from the payload. The default is `kafka.key`.
Putting it together, the following input format (that uses the default values for `timestampColumnName`, `headerColumnPrefix`, and `keyColumnName`)
```json
{
"type": "kafka",
"valueFormat": {
"type": "json"
},
"headerFormat": {
"type": "string"
},
"keyFormat": {
"type": "tsv",
"findColumnsFromHeader": false,
"columns": ["x"]
}
}
```
would parse the example message as follows:
```json
{
"channel": "#sv.wikipedia",
"timestamp": "2016-06-27T00:00:11.080Z",
"page": "Salo Toraut",
"delta": 31,
"namespace": "Main",
"kafka.timestamp": 1680795276351,
"kafka.header.env": "development",
"kafka.header.zone": "z1",
"kafka.key": "wiki-edit"
}
```
For more information on data formats, see [Data formats](../../ingestion/data-formats.md).
Finally, add the Kafka message columns to the `dimensionsSpec`. For the key and timestamp, you can use the dimension names you defined for `keyColumnName` and `timestampColumnName`. For header dimensions, append the header key to the `headerColumnPrefix`. For example `kafka.header.environment`.
Finally, add these Kafka metadata columns to the `dimensionsSpec` or set your `dimensionsSpec` to auto-detect columns.
The following supervisor spec demonstrates how to ingest the Kafka header, key, and timestamp into Druid dimensions:
```
{
"type": "kafka",
@ -177,84 +221,61 @@ The following supervisor spec demonstrates how to ingest the Kafka header, key,
"topic": "wiki-edits",
"inputFormat": {
"type": "kafka",
"headerColumnPrefix": "kafka.header.",
"timestampColumnName": "kafka.timestamp",
"keyColumnName": "kafka.key",
"valueFormat": {
"type": "json"
},
"headerFormat": {
"type": "string"
},
"keyFormat": {
"type": "json"
},
"valueFormat": {
"type": "json"
},
"findColumnsFromHeader": false
"type": "tsv",
"findColumnsFromHeader": false,
"columns": ["x"]
}
},
"useEarliestOffset": true
},
"tuningConfig": {
"type": "kafka"
},
"dataSchema": {
"dataSource": "wikiticker",
"timestampSpec": {
"column": "timestamp",
"format": "posix"
},
"dimensionsSpec": {
"dimensions": [
{
"type": "string",
"name": "kafka.key"
},
{
"type": "string",
"name": "kafka.timestamp"
},
{
"type": "string",
"name": "kafka.header.environment"
},
"$schema",
{
"type": "long",
"name": "id"
},
"type",
{
"type": "long",
"name": "namespace"
},
"title",
"comment",
"user",]
]
"dimensionsSpec": "dimensionsSpec": {
"useSchemaDiscovery": true,
"includeAllDimensions": true
},
"granularitySpec": {
"queryGranularity": "none",
"rollup": false,
"segmentGranularity": "day"
}
},
"tuningConfig": {
"type": "kafka"
}
},
"tuningConfig": {
"type": "kafka"
}
}
```
After Druid ingests the data, you can query the Kafka message columns as follows:
```unix
After Druid ingests the data, you can query the Kafka metadata columns as follows:
```sql
SELECT
"kafka.header.environment",
"kafka.header.env",
"kafka.key",
"kafka.timestamp"
FROM "wikiticker"
kafka.header.environment kafka.key kafka.timestamp
development wiki-edit 1636399229823
```
This query returns:
| `kafka.header.env` | `kafka.key` | `kafka.timestamp` |
|--------------------|-----------|---------------|
| `development` | `wiki-edit` | `1680795276351` |
For more information, see [`kafka` data format](../../ingestion/data-formats.md#kafka).
## Submit a supervisor spec
Druid starts a supervisor for a dataSource when you submit a supervisor spec. You can use the data loader in the web console or you can submit a supervisor spec to the following endpoint:

View File

@ -175,8 +175,9 @@ Kafka indexing service supports both [`inputFormat`](../../ingestion/data-format
Use the `inputFormat` to specify the data format for Kafka indexing service unless you need a format only supported by the legacy `parser`.
Supported `inputFormat`s include:
- `csv`
- `delimited`
- `tsv`
- `json`
- `kafka`
- `avro_stream`

View File

@ -150,7 +150,7 @@ Configure the TSV `inputFormat` to load TSV data as follows:
Be sure to change the `delimiter` to the appropriate delimiter for your data. Like CSV, you must specify the columns and which subset of the columns you want indexed.
For example:
For example:
```json
"ioConfig": {
@ -163,72 +163,6 @@ Be sure to change the `delimiter` to the appropriate delimiter for your data. Li
}
```
### Kafka
Configure the Kafka `inputFormat` to load complete kafka records including header, key, and value.
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| `type` | String | Set value to `kafka`. | yes |
| `headerColumnPrefix` | String | Custom prefix for all the header columns. | no (default = "kafka.header.") |
| `timestampColumnName` | String | Name of the column for the kafka record's timestamp.| no (default = "kafka.timestamp") |
| `keyColumnName` | String | Name of the column for the kafka record's key.| no (default = "kafka.key") |
| `headerFormat` | Object | `headerFormat` specifies how to parse the Kafka headers. Supports String types. Because Kafka header values are bytes, the parser decodes them as UTF-8 encoded strings. To change this behavior, implement your own parser based on the encoding style. Change the 'encoding' type in `KafkaStringHeaderFormat` to match your custom implementation. | no |
| `keyFormat` | [InputFormat](#input-format) | Any existing `inputFormat` used to parse the Kafka key. It only processes the first entry of the input format. For details, see [Specifying data format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format). | no |
| `valueFormat` | [InputFormat](#input-format) | `valueFormat` can be any existing `inputFormat` to parse the Kafka value payload. For details about specifying the input format, see [Specifying data format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format). | yes |
For example:
```
"ioConfig": {
"inputFormat": {
"type": "kafka",
"headerColumnPrefix": "kafka.header.",
"timestampColumnName": "kafka.timestamp",
"keyColumnName": "kafka.key",
"headerFormat":
{
"type": "string"
},
"keyFormat":
{
"type": "json"
},
"valueFormat":
{
"type": "json"
}
},
...
}
```
Note the following behaviors:
- If there are conflicts between column names, Druid uses the column names from the payload and ignores the column name from the header or key. This behavior makes it easier to migrate to the the Kafka `inputFormat` from another Kafka ingestion spec without losing data.
- The Kafka input format fundamentally blends information from the header, key, and value objects from a Kafka record to create a row in Druid. It extracts individual records from the value. Then it augments each value with the corresponding key or header columns.
- The Kafka input format by default exposes Kafka timestamp `timestampColumnName` to make it available for use as the primary timestamp column. Alternatively you can choose timestamp column from either the key or value payload.
For example, the following `timestampSpec` uses the default Kafka timestamp from the Kafka record:
```json
"timestampSpec":
{
"column": "kafka.timestamp",
"format": "millis"
}
```
If you are using "kafka.header." as the prefix for Kafka header columns and there is a timestamp field in the header, the header timestamp serves as the primary timestamp column. For example:
```json
"timestampSpec":
{
"column": "kafka.header.timestamp",
"format": "millis"
}
```
### ORC
To use the ORC input format, load the Druid Orc extension ( [`druid-orc-extensions`](../development/extensions-core/orc.md)).
@ -604,7 +538,88 @@ For example:
}
```
### FlattenSpec
### Kafka
`kafka` is a special input format that wraps a regular input format (which goes in `valueFormat`) and allows you
to parse the Kafka metadata (timestamp, headers, and key) that is part of Kafka messages.
It should only be used when ingesting from Apache Kafka.
Configure the Kafka `inputFormat` as follows:
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| `type` | String | Set value to `kafka`. | yes |
| `valueFormat` | [InputFormat](#input-format) | Any [InputFormat](#input-format) to parse the Kafka value payload. For details about specifying the input format, see [Specifying data format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format). | yes |
| `timestampColumnName` | String | Name of the column for the kafka record's timestamp.| no (default = "kafka.timestamp") |
| `headerColumnPrefix` | String | Custom prefix for all the header columns. | no (default = "kafka.header.") |
| `headerFormat` | Object | `headerFormat` specifies how to parse the Kafka headers. Supports String types. Because Kafka header values are bytes, the parser decodes them as UTF-8 encoded strings. To change this behavior, implement your own parser based on the encoding style. Change the 'encoding' type in `KafkaStringHeaderFormat` to match your custom implementation. | no |
| `keyFormat` | [InputFormat](#input-format) | Any [input format](#input-format) to parse the Kafka key. It only processes the first entry of the `inputFormat` field. For details, see [Specifying data format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format). | no |
| `keyColumnName` | String | Name of the column for the kafka record's key.| no (default = "kafka.key") |
The Kafka input format augments the payload with information from the Kafka timestamp, headers, and key.
If there are conflicts between column names in the payload and those created from the metadata, the payload takes precedence.
This ensures that upgrading a Kafka ingestion to use the Kafka input format (by taking its existing input format and setting it as the `valueFormat`) can be done without losing any of the payload data.
Here is a minimal example that only augments the parsed payload with the Kafka timestamp column:
```
"ioConfig": {
"inputFormat": {
"type": "kafka",
"valueFormat": {
"type": "json"
}
},
...
}
```
Here is a complete example:
```
"ioConfig": {
"inputFormat": {
"type": "kafka",
"valueFormat": {
"type": "json"
}
"timestampColumnName": "kafka.timestamp",
"headerFormat": {
"type": "string",
"encoding": "UTF-8"
},
"headerColumnPrefix": "kafka.header.",
"keyFormat": {
"type": "tsv",
"findColumnsFromHeader": false,
"columns": ["x"]
},
"keyColumnName": "kafka.key",
},
...
}
```
If you want to use `kafka.timestamp` as Druid's primary timestamp (`__time`), specify it as the value for `column` in the `timestampSpec`:
```json
"timestampSpec": {
"column": "kafka.timestamp",
"format": "millis"
}
```
Similarly, if you want to use a timestamp extracted from the Kafka header:
```json
"timestampSpec": {
"column": "kafka.header.myTimestampHeader",
"format": "millis"
}
```
## FlattenSpec
You can use the `flattenSpec` object to flatten nested data, as an alternative to the Druid [nested columns](../querying/nested-columns.md) feature, and for nested input formats unsupported by the feature. It is an object within the `inputFormat` object.
@ -635,7 +650,7 @@ After Druid reads the input data records, it applies the flattenSpec before appl
Flattening is only supported for [data formats](data-formats.md) that support nesting, including `avro`, `json`, `orc`, and `parquet`.
#### Field flattening specifications
### Field flattening specifications
Each entry in the `fields` list can have the following components:
@ -646,7 +661,7 @@ Each entry in the `fields` list can have the following components:
| expr | Expression for accessing the field while flattening. For type `path`, this should be [JsonPath](https://github.com/jayway/JsonPath). For type `jq`, this should be [jackson-jq](https://github.com/eiiches/jackson-jq) notation. For other types, this parameter is ignored. | none (required for types `path` and `jq`) |
| nodes | For `tree` only. Multiple-expression field for accessing the field while flattening, representing the hierarchy of field names to read. For other types, this parameter must not be provided. | none (required for type `tree`) |
#### Notes on flattening
### Notes on flattening
- For convenience, when defining a root-level field, it is possible to define only the field name, as a string, instead of a JSON object. For example, `{"name": "baz", "type": "root"}` is equivalent to `"baz"`.
- Enabling `useFieldDiscovery` will only automatically detect "simple" fields at the root level that correspond to data types that Druid supports. This includes strings, numbers, and lists of strings or numbers. Other types will not be automatically detected, and must be specified explicitly in the `fields` list.