From 631dc3b589e01288680d28ece2f709917f32d710 Mon Sep 17 00:00:00 2001 From: Vadim Ogievetsky Date: Mon, 21 Aug 2023 09:03:23 -0700 Subject: [PATCH] add Kafka topic column controls (#14865) --- .../src/druid-models/input-format/input-format.tsx | 11 ++++++++++- .../src/views/load-data-view/load-data-view.tsx | 1 + 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/web-console/src/druid-models/input-format/input-format.tsx b/web-console/src/druid-models/input-format/input-format.tsx index 38bfba50870..6da11aca2b3 100644 --- a/web-console/src/druid-models/input-format/input-format.tsx +++ b/web-console/src/druid-models/input-format/input-format.tsx @@ -42,6 +42,7 @@ export interface InputFormat { // type: kafka readonly timestampColumnName?: string; + readonly topicColumnName?: string; readonly headerFormat?: { type: 'string'; encoding?: string }; readonly headerColumnPrefix?: string; readonly keyFormat?: InputFormat; @@ -253,7 +254,15 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field[] = [ type: 'string', defaultValue: 'kafka.timestamp', defined: typeIsKnown(KNOWN_TYPES, 'kafka'), - info: `Name of the column for the kafka record's timestamp.`, + info: `Name of the column for the Kafka record's timestamp.`, + }, + { + name: 'topicColumnName', + label: 'Kafka topic column name', + type: 'string', + defaultValue: 'kafka.topic', + defined: typeIsKnown(KNOWN_TYPES, 'kafka'), + info: `Name of the column for the topic from which the Kafka record came.`, }, // ----------------------------------------------------- diff --git a/web-console/src/views/load-data-view/load-data-view.tsx b/web-console/src/views/load-data-view/load-data-view.tsx index d38355601de..71fb889163a 100644 --- a/web-console/src/views/load-data-view/load-data-view.tsx +++ b/web-console/src/views/load-data-view/load-data-view.tsx @@ -226,6 +226,7 @@ function showKafkaLine(line: SampleEntry): string { if (!input) return 'Invalid kafka row'; return compact([ `[ Kafka timestamp: ${input['kafka.timestamp']}`, + ` Topic: ${input['kafka.topic']}`, ...filterMap(Object.entries(input), ([k, v]) => { if (!k.startsWith('kafka.header.')) return; return ` Header: ${k.slice(13)}=${v}`;