mirror of https://github.com/apache/druid.git
Add docs for ingesting Kafka topic name (#14894)
Add documentation on how to extract the Kafka topic name and ingest it into the data.
This commit is contained in:
parent
54336e2a3e
commit
3c7b237c22
|
@ -135,11 +135,12 @@ The following example demonstrates a supervisor spec for Kafka that uses the `JS
|
||||||
If you want to parse the Kafka metadata fields in addition to the Kafka payload value contents, you can use the `kafka` input format.
|
If you want to parse the Kafka metadata fields in addition to the Kafka payload value contents, you can use the `kafka` input format.
|
||||||
|
|
||||||
The `kafka` input format wraps around the payload parsing input format and augments the data it outputs with the Kafka event timestamp,
|
The `kafka` input format wraps around the payload parsing input format and augments the data it outputs with the Kafka event timestamp,
|
||||||
the Kafka event headers, and the key field that itself can be parsed using any available InputFormat.
|
the Kafka topic name, the Kafka event headers, and the key field that itself can be parsed using any available InputFormat.
|
||||||
|
|
||||||
For example, consider the following structure for a Kafka message that represents a fictitious wiki edit in a development environment:
|
For example, consider the following structure for a Kafka message that represents a fictitious wiki edit in a development environment:
|
||||||
|
|
||||||
- **Kafka timestamp**: `1680795276351`
|
- **Kafka timestamp**: `1680795276351`
|
||||||
|
- **Kafka topic**: `wiki-edits`
|
||||||
- **Kafka headers**:
|
- **Kafka headers**:
|
||||||
- `env=development`
|
- `env=development`
|
||||||
- `zone=z1`
|
- `zone=z1`
|
||||||
|
@ -153,6 +154,7 @@ You would configure it as follows:
|
||||||
|
|
||||||
- `valueFormat`: Define how to parse the payload value. Set this to the payload parsing input format (`{ "type": "json" }`).
|
- `valueFormat`: Define how to parse the payload value. Set this to the payload parsing input format (`{ "type": "json" }`).
|
||||||
- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the Druid schema to avoid conflicts with columns from the payload. The default is `kafka.timestamp`.
|
- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the Druid schema to avoid conflicts with columns from the payload. The default is `kafka.timestamp`.
|
||||||
|
- `topicColumnName`: Supply a custom name for the Kafka topic in the Druid schema to avoid conflicts with columns from the payload. The default is `kafka.topic`. This field is useful when ingesting data from multiple topics into same datasource.
|
||||||
- `headerFormat`: The default value `string` decodes strings in UTF-8 encoding from the Kafka header.
|
- `headerFormat`: The default value `string` decodes strings in UTF-8 encoding from the Kafka header.
|
||||||
Other supported encoding formats include the following:
|
Other supported encoding formats include the following:
|
||||||
- `ISO-8859-1`: ISO Latin Alphabet No. 1, that is, ISO-LATIN-1.
|
- `ISO-8859-1`: ISO Latin Alphabet No. 1, that is, ISO-LATIN-1.
|
||||||
|
@ -174,7 +176,7 @@ You would configure it as follows:
|
||||||
Note that for `tsv`,`csv`, and `regex` formats, you need to provide a `columns` array to make a valid input format. Only the first one is used, and its name will be ignored in favor of `keyColumnName`.
|
Note that for `tsv`,`csv`, and `regex` formats, you need to provide a `columns` array to make a valid input format. Only the first one is used, and its name will be ignored in favor of `keyColumnName`.
|
||||||
- `keyColumnName`: Supply the name for the Kafka key column to avoid conflicts with columns from the payload. The default is `kafka.key`.
|
- `keyColumnName`: Supply the name for the Kafka key column to avoid conflicts with columns from the payload. The default is `kafka.key`.
|
||||||
|
|
||||||
Putting it together, the following input format (that uses the default values for `timestampColumnName`, `headerColumnPrefix`, and `keyColumnName`)
|
Putting it together, the following input format (that uses the default values for `timestampColumnName`, `topicColumnName`, `headerColumnPrefix`, and `keyColumnName`)
|
||||||
|
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
|
@ -203,6 +205,7 @@ would parse the example message as follows:
|
||||||
"delta": 31,
|
"delta": 31,
|
||||||
"namespace": "Main",
|
"namespace": "Main",
|
||||||
"kafka.timestamp": 1680795276351,
|
"kafka.timestamp": 1680795276351,
|
||||||
|
"kafka.topic": "wiki-edits",
|
||||||
"kafka.header.env": "development",
|
"kafka.header.env": "development",
|
||||||
"kafka.header.zone": "z1",
|
"kafka.header.zone": "z1",
|
||||||
"kafka.key": "wiki-edit"
|
"kafka.key": "wiki-edit"
|
||||||
|
@ -213,7 +216,7 @@ For more information on data formats, see [Data formats](../../ingestion/data-fo
|
||||||
|
|
||||||
Finally, add these Kafka metadata columns to the `dimensionsSpec` or set your `dimensionsSpec` to auto-detect columns.
|
Finally, add these Kafka metadata columns to the `dimensionsSpec` or set your `dimensionsSpec` to auto-detect columns.
|
||||||
|
|
||||||
The following supervisor spec demonstrates how to ingest the Kafka header, key, and timestamp into Druid dimensions:
|
The following supervisor spec demonstrates how to ingest the Kafka header, key, timestamp, and topic into Druid dimensions:
|
||||||
|
|
||||||
```
|
```
|
||||||
{
|
{
|
||||||
|
@ -270,15 +273,16 @@ After Druid ingests the data, you can query the Kafka metadata columns as follow
|
||||||
SELECT
|
SELECT
|
||||||
"kafka.header.env",
|
"kafka.header.env",
|
||||||
"kafka.key",
|
"kafka.key",
|
||||||
"kafka.timestamp"
|
"kafka.timestamp",
|
||||||
|
"kafka.topic"
|
||||||
FROM "wikiticker"
|
FROM "wikiticker"
|
||||||
```
|
```
|
||||||
|
|
||||||
This query returns:
|
This query returns:
|
||||||
|
|
||||||
| `kafka.header.env` | `kafka.key` | `kafka.timestamp` |
|
| `kafka.header.env` | `kafka.key` | `kafka.timestamp` | `kafka.topic` |
|
||||||
|--------------------|-----------|---------------|
|
|--------------------|-----------|---------------|---------------|
|
||||||
| `development` | `wiki-edit` | `1680795276351` |
|
| `development` | `wiki-edit` | `1680795276351` | `wiki-edits` |
|
||||||
|
|
||||||
For more information, see [`kafka` data format](../../ingestion/data-formats.md#kafka).
|
For more information, see [`kafka` data format](../../ingestion/data-formats.md#kafka).
|
||||||
|
|
||||||
|
|
|
@ -560,17 +560,19 @@ Configure the Kafka `inputFormat` as follows:
|
||||||
| `type` | String | Set value to `kafka`. | yes |
|
| `type` | String | Set value to `kafka`. | yes |
|
||||||
| `valueFormat` | [InputFormat](#input-format) | Any [InputFormat](#input-format) to parse the Kafka value payload. For details about specifying the input format, see [Specifying data format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format). | yes |
|
| `valueFormat` | [InputFormat](#input-format) | Any [InputFormat](#input-format) to parse the Kafka value payload. For details about specifying the input format, see [Specifying data format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format). | yes |
|
||||||
| `timestampColumnName` | String | Name of the column for the kafka record's timestamp.| no (default = "kafka.timestamp") |
|
| `timestampColumnName` | String | Name of the column for the kafka record's timestamp.| no (default = "kafka.timestamp") |
|
||||||
|
| `topicColumnName` | String |Name of the column for the kafka record's topic. It is useful when ingesting data from multiple topics.| no (default = "kafka.timestamp") |
|
||||||
| `headerColumnPrefix` | String | Custom prefix for all the header columns. | no (default = "kafka.header.") |
|
| `headerColumnPrefix` | String | Custom prefix for all the header columns. | no (default = "kafka.header.") |
|
||||||
| `headerFormat` | Object | `headerFormat` specifies how to parse the Kafka headers. Supports String types. Because Kafka header values are bytes, the parser decodes them as UTF-8 encoded strings. To change this behavior, implement your own parser based on the encoding style. Change the 'encoding' type in `KafkaStringHeaderFormat` to match your custom implementation. | no |
|
| `headerFormat` | Object | `headerFormat` specifies how to parse the Kafka headers. Supports String types. Because Kafka header values are bytes, the parser decodes them as UTF-8 encoded strings. To change this behavior, implement your own parser based on the encoding style. Change the 'encoding' type in `KafkaStringHeaderFormat` to match your custom implementation. | no |
|
||||||
| `keyFormat` | [InputFormat](#input-format) | Any [input format](#input-format) to parse the Kafka key. It only processes the first entry of the `inputFormat` field. For details, see [Specifying data format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format). | no |
|
| `keyFormat` | [InputFormat](#input-format) | Any [input format](#input-format) to parse the Kafka key. It only processes the first entry of the `inputFormat` field. For details, see [Specifying data format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format). | no |
|
||||||
| `keyColumnName` | String | Name of the column for the kafka record's key.| no (default = "kafka.key") |
|
| `keyColumnName` | String | Name of the column for the kafka record's key.| no (default = "kafka.key") |
|
||||||
|
|
||||||
|
|
||||||
The Kafka input format augments the payload with information from the Kafka timestamp, headers, and key.
|
The Kafka input format augments the payload with information from the Kafka timestamp, headers, and key.
|
||||||
|
|
||||||
If there are conflicts between column names in the payload and those created from the metadata, the payload takes precedence.
|
If there are conflicts between column names in the payload and those created from the metadata, the payload takes precedence.
|
||||||
This ensures that upgrading a Kafka ingestion to use the Kafka input format (by taking its existing input format and setting it as the `valueFormat`) can be done without losing any of the payload data.
|
This ensures that upgrading a Kafka ingestion to use the Kafka input format (by taking its existing input format and setting it as the `valueFormat`) can be done without losing any of the payload data.
|
||||||
|
|
||||||
Here is a minimal example that only augments the parsed payload with the Kafka timestamp column:
|
Here is a minimal example that only augments the parsed payload with the Kafka timestamp column and kafka topic column:
|
||||||
|
|
||||||
```
|
```
|
||||||
"ioConfig": {
|
"ioConfig": {
|
||||||
|
@ -594,6 +596,7 @@ Here is a complete example:
|
||||||
"type": "json"
|
"type": "json"
|
||||||
}
|
}
|
||||||
"timestampColumnName": "kafka.timestamp",
|
"timestampColumnName": "kafka.timestamp",
|
||||||
|
"topicColumnName": "kafka.topic",
|
||||||
"headerFormat": {
|
"headerFormat": {
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"encoding": "UTF-8"
|
"encoding": "UTF-8"
|
||||||
|
|
|
@ -1258,6 +1258,7 @@ KafkaStringHeaderFormat
|
||||||
kafka.header.
|
kafka.header.
|
||||||
kafka.key
|
kafka.key
|
||||||
kafka.timestamp
|
kafka.timestamp
|
||||||
|
kafka.topic
|
||||||
keyColumnName
|
keyColumnName
|
||||||
keyFormat
|
keyFormat
|
||||||
listDelimiter
|
listDelimiter
|
||||||
|
|
Loading…
Reference in New Issue