mirror of
https://github.com/apache/druid.git
synced 2025-02-17 07:25:02 +00:00
add Kafka topic column controls (#14865)
This commit is contained in:
parent
a38b4f0491
commit
631dc3b589
@ -42,6 +42,7 @@ export interface InputFormat {
|
|||||||
|
|
||||||
// type: kafka
|
// type: kafka
|
||||||
readonly timestampColumnName?: string;
|
readonly timestampColumnName?: string;
|
||||||
|
readonly topicColumnName?: string;
|
||||||
readonly headerFormat?: { type: 'string'; encoding?: string };
|
readonly headerFormat?: { type: 'string'; encoding?: string };
|
||||||
readonly headerColumnPrefix?: string;
|
readonly headerColumnPrefix?: string;
|
||||||
readonly keyFormat?: InputFormat;
|
readonly keyFormat?: InputFormat;
|
||||||
@ -253,7 +254,15 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
|
|||||||
type: 'string',
|
type: 'string',
|
||||||
defaultValue: 'kafka.timestamp',
|
defaultValue: 'kafka.timestamp',
|
||||||
defined: typeIsKnown(KNOWN_TYPES, 'kafka'),
|
defined: typeIsKnown(KNOWN_TYPES, 'kafka'),
|
||||||
info: `Name of the column for the kafka record's timestamp.`,
|
info: `Name of the column for the Kafka record's timestamp.`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: 'topicColumnName',
|
||||||
|
label: 'Kafka topic column name',
|
||||||
|
type: 'string',
|
||||||
|
defaultValue: 'kafka.topic',
|
||||||
|
defined: typeIsKnown(KNOWN_TYPES, 'kafka'),
|
||||||
|
info: `Name of the column for the topic from which the Kafka record came.`,
|
||||||
},
|
},
|
||||||
|
|
||||||
// -----------------------------------------------------
|
// -----------------------------------------------------
|
||||||
|
@ -226,6 +226,7 @@ function showKafkaLine(line: SampleEntry): string {
|
|||||||
if (!input) return 'Invalid kafka row';
|
if (!input) return 'Invalid kafka row';
|
||||||
return compact([
|
return compact([
|
||||||
`[ Kafka timestamp: ${input['kafka.timestamp']}`,
|
`[ Kafka timestamp: ${input['kafka.timestamp']}`,
|
||||||
|
` Topic: ${input['kafka.topic']}`,
|
||||||
...filterMap(Object.entries(input), ([k, v]) => {
|
...filterMap(Object.entries(input), ([k, v]) => {
|
||||||
if (!k.startsWith('kafka.header.')) return;
|
if (!k.startsWith('kafka.header.')) return;
|
||||||
return ` Header: ${k.slice(13)}=${v}`;
|
return ` Header: ${k.slice(13)}=${v}`;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user