From 3c7b237c22505e195c4c6278c4123707755da7e4 Mon Sep 17 00:00:00 2001 From: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com> Date: Thu, 24 Aug 2023 19:19:59 +0530 Subject: [PATCH] Add docs for ingesting Kafka topic name (#14894) Add documentation on how to extract the Kafka topic name and ingest it into the data. --- .../extensions-core/kafka-ingestion.md | 18 +++++++++++------- docs/ingestion/data-formats.md | 5 ++++- website/.spelling | 1 + 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index 46426e55f27..329967747bf 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -135,11 +135,12 @@ The following example demonstrates a supervisor spec for Kafka that uses the `JS If you want to parse the Kafka metadata fields in addition to the Kafka payload value contents, you can use the `kafka` input format. The `kafka` input format wraps around the payload parsing input format and augments the data it outputs with the Kafka event timestamp, -the Kafka event headers, and the key field that itself can be parsed using any available InputFormat. +the Kafka topic name, the Kafka event headers, and the key field that itself can be parsed using any available InputFormat. For example, consider the following structure for a Kafka message that represents a fictitious wiki edit in a development environment: - **Kafka timestamp**: `1680795276351` +- **Kafka topic**: `wiki-edits` - **Kafka headers**: - `env=development` - `zone=z1` @@ -153,6 +154,7 @@ You would configure it as follows: - `valueFormat`: Define how to parse the payload value. Set this to the payload parsing input format (`{ "type": "json" }`). - `timestampColumnName`: Supply a custom name for the Kafka timestamp in the Druid schema to avoid conflicts with columns from the payload. The default is `kafka.timestamp`. +- `topicColumnName`: Supply a custom name for the Kafka topic in the Druid schema to avoid conflicts with columns from the payload. The default is `kafka.topic`. This field is useful when ingesting data from multiple topics into same datasource. - `headerFormat`: The default value `string` decodes strings in UTF-8 encoding from the Kafka header. Other supported encoding formats include the following: - `ISO-8859-1`: ISO Latin Alphabet No. 1, that is, ISO-LATIN-1. @@ -174,7 +176,7 @@ You would configure it as follows: Note that for `tsv`,`csv`, and `regex` formats, you need to provide a `columns` array to make a valid input format. Only the first one is used, and its name will be ignored in favor of `keyColumnName`. - `keyColumnName`: Supply the name for the Kafka key column to avoid conflicts with columns from the payload. The default is `kafka.key`. -Putting it together, the following input format (that uses the default values for `timestampColumnName`, `headerColumnPrefix`, and `keyColumnName`) +Putting it together, the following input format (that uses the default values for `timestampColumnName`, `topicColumnName`, `headerColumnPrefix`, and `keyColumnName`) ```json { @@ -203,6 +205,7 @@ would parse the example message as follows: "delta": 31, "namespace": "Main", "kafka.timestamp": 1680795276351, + "kafka.topic": "wiki-edits", "kafka.header.env": "development", "kafka.header.zone": "z1", "kafka.key": "wiki-edit" @@ -213,7 +216,7 @@ For more information on data formats, see [Data formats](../../ingestion/data-fo Finally, add these Kafka metadata columns to the `dimensionsSpec` or set your `dimensionsSpec` to auto-detect columns. -The following supervisor spec demonstrates how to ingest the Kafka header, key, and timestamp into Druid dimensions: +The following supervisor spec demonstrates how to ingest the Kafka header, key, timestamp, and topic into Druid dimensions: ``` { @@ -270,15 +273,16 @@ After Druid ingests the data, you can query the Kafka metadata columns as follow SELECT "kafka.header.env", "kafka.key", - "kafka.timestamp" + "kafka.timestamp", + "kafka.topic" FROM "wikiticker" ``` This query returns: -| `kafka.header.env` | `kafka.key` | `kafka.timestamp` | -|--------------------|-----------|---------------| -| `development` | `wiki-edit` | `1680795276351` | +| `kafka.header.env` | `kafka.key` | `kafka.timestamp` | `kafka.topic` | +|--------------------|-----------|---------------|---------------| +| `development` | `wiki-edit` | `1680795276351` | `wiki-edits` | For more information, see [`kafka` data format](../../ingestion/data-formats.md#kafka). diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index e3ad22a9163..7dd1b10c7fa 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -560,17 +560,19 @@ Configure the Kafka `inputFormat` as follows: | `type` | String | Set value to `kafka`. | yes | | `valueFormat` | [InputFormat](#input-format) | Any [InputFormat](#input-format) to parse the Kafka value payload. For details about specifying the input format, see [Specifying data format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format). | yes | | `timestampColumnName` | String | Name of the column for the kafka record's timestamp.| no (default = "kafka.timestamp") | +| `topicColumnName` | String |Name of the column for the kafka record's topic. It is useful when ingesting data from multiple topics.| no (default = "kafka.timestamp") | | `headerColumnPrefix` | String | Custom prefix for all the header columns. | no (default = "kafka.header.") | | `headerFormat` | Object | `headerFormat` specifies how to parse the Kafka headers. Supports String types. Because Kafka header values are bytes, the parser decodes them as UTF-8 encoded strings. To change this behavior, implement your own parser based on the encoding style. Change the 'encoding' type in `KafkaStringHeaderFormat` to match your custom implementation. | no | | `keyFormat` | [InputFormat](#input-format) | Any [input format](#input-format) to parse the Kafka key. It only processes the first entry of the `inputFormat` field. For details, see [Specifying data format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format). | no | | `keyColumnName` | String | Name of the column for the kafka record's key.| no (default = "kafka.key") | + The Kafka input format augments the payload with information from the Kafka timestamp, headers, and key. If there are conflicts between column names in the payload and those created from the metadata, the payload takes precedence. This ensures that upgrading a Kafka ingestion to use the Kafka input format (by taking its existing input format and setting it as the `valueFormat`) can be done without losing any of the payload data. -Here is a minimal example that only augments the parsed payload with the Kafka timestamp column: +Here is a minimal example that only augments the parsed payload with the Kafka timestamp column and kafka topic column: ``` "ioConfig": { @@ -594,6 +596,7 @@ Here is a complete example: "type": "json" } "timestampColumnName": "kafka.timestamp", + "topicColumnName": "kafka.topic", "headerFormat": { "type": "string", "encoding": "UTF-8" diff --git a/website/.spelling b/website/.spelling index 32f7786b1b6..cc4e02fcf29 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1258,6 +1258,7 @@ KafkaStringHeaderFormat kafka.header. kafka.key kafka.timestamp +kafka.topic keyColumnName keyFormat listDelimiter