diff --git a/web-console/src/dialogs/lookup-edit-dialog/__snapshots__/lookup-edit-dialog.spec.tsx.snap b/web-console/src/dialogs/lookup-edit-dialog/__snapshots__/lookup-edit-dialog.spec.tsx.snap index 1c880e9340c..ea356d5370f 100644 --- a/web-console/src/dialogs/lookup-edit-dialog/__snapshots__/lookup-edit-dialog.spec.tsx.snap +++ b/web-console/src/dialogs/lookup-edit-dialog/__snapshots__/lookup-edit-dialog.spec.tsx.snap @@ -74,6 +74,7 @@ exports[`LookupEditDialog matches snapshot 1`] = ` "suggestions": Array [ "map", "cachedNamespace", + "kafka", ], "type": "string", }, @@ -398,6 +399,35 @@ exports[`LookupEditDialog matches snapshot 1`] = ` "name": "injective", "type": "boolean", }, + Object { + "defined": [Function], + "info": "The Kafka topic to read the data from", + "name": "kafkaTopic", + "required": true, + "type": "string", + }, + Object { + "defined": [Function], + "height": "100px", + "issueWithValue": [Function], + "name": "kafkaProperties", + "required": true, + "type": "json", + }, + Object { + "defaultValue": 0, + "defined": [Function], + "info": "How long to wait for an initial connection", + "name": "connectTimeout", + "type": "number", + }, + Object { + "defaultValue": false, + "defined": [Function], + "info": "If the underlying map is one-to-one (keys and values are unique) then optimizations can occur internally by setting this to true", + "name": "isOneToOne", + "type": "boolean", + }, ] } model={ diff --git a/web-console/src/druid-models/lookup-spec/lookup-spec.tsx b/web-console/src/druid-models/lookup-spec/lookup-spec.tsx index b1328a8f12a..3e964949538 100644 --- a/web-console/src/druid-models/lookup-spec/lookup-spec.tsx +++ b/web-console/src/druid-models/lookup-spec/lookup-spec.tsx @@ -57,10 +57,20 @@ export interface NamespaceParseSpec { export interface LookupSpec { readonly type: string; + + // type: map readonly map?: Record; + + // type: cachedNamespace readonly extractionNamespace?: ExtractionNamespaceSpec; readonly firstCacheTimeout?: number; readonly injective?: boolean; + + // type: kafka + readonly kafkaTopic?: string; + readonly kafkaProperties?: Record; + readonly connectTimeout?: number; + readonly isOneToOne?: boolean; } function issueWithUri(uri: string): string | undefined { @@ -83,7 +93,7 @@ export const LOOKUP_FIELDS: Field[] = [ { name: 'type', type: 'string', - suggestions: ['map', 'cachedNamespace'], + suggestions: ['map', 'cachedNamespace', 'kafka'], required: true, adjustment: l => { if (l.type === 'map' && !l.map) { @@ -92,6 +102,9 @@ export const LOOKUP_FIELDS: Field[] = [ if (l.type === 'cachedNamespace' && !deepGet(l, 'extractionNamespace.type')) { return deepSet(l, 'extractionNamespace', { type: 'uri', pollPeriod: 'PT1H' }); } + if (l.type === 'kafka' && !deepGet(l, 'kafkaProperties')) { + return deepSet(l, 'kafkaProperties', { 'bootstrap.servers': '' }); + } return l; }, }, @@ -396,6 +409,42 @@ export const LOOKUP_FIELDS: Field[] = [ defined: typeIs('cachedNamespace'), info: `If the underlying map is injective (keys and values are unique) then optimizations can occur internally by setting this to true`, }, + + // kafka lookups + { + name: 'kafkaTopic', + type: 'string', + defined: typeIs('kafka'), + required: true, + info: `The Kafka topic to read the data from`, + }, + { + name: 'kafkaProperties', + type: 'json', + height: '100px', + defined: typeIs('kafka'), + required: true, + issueWithValue: value => { + if (!value) return 'kafkaProperties must be defined'; + if (typeof value !== 'object') return `kafkaProperties must be an object`; + if (!value['bootstrap.servers']) return 'bootstrap.servers must be defined'; + return; + }, + }, + { + name: 'connectTimeout', + type: 'number', + defaultValue: 0, + defined: typeIs('kafka'), + info: `How long to wait for an initial connection`, + }, + { + name: 'isOneToOne', + type: 'boolean', + defaultValue: false, + defined: typeIs('kafka'), + info: `If the underlying map is one-to-one (keys and values are unique) then optimizations can occur internally by setting this to true`, + }, ]; export function isLookupInvalid( @@ -410,43 +459,54 @@ export function isLookupInvalid( } export function lookupSpecSummary(spec: LookupSpec): string { - const { map, extractionNamespace } = spec; + const { type, map, extractionNamespace, kafkaTopic, kafkaProperties } = spec; - if (map) { - return pluralIfNeeded(Object.keys(map).length, 'key'); - } + switch (type) { + case 'map': + if (!map) return 'No map'; + return pluralIfNeeded(Object.keys(map).length, 'key'); - if (extractionNamespace) { - switch (extractionNamespace.type) { - case 'uri': - if (extractionNamespace.uriPrefix) { - return `URI prefix: ${extractionNamespace.uriPrefix}, Match: ${ - extractionNamespace.fileRegex || '.*' - }`; - } - if (extractionNamespace.uri) { - return `URI: ${extractionNamespace.uri}`; - } - return 'Unknown extractionNamespace lookup'; + case 'cachedNamespace': + if (!extractionNamespace) return 'No extractionNamespace'; + switch (extractionNamespace.type) { + case 'uri': + if (extractionNamespace.uriPrefix) { + return `URI prefix: ${extractionNamespace.uriPrefix}, Match: ${ + extractionNamespace.fileRegex || '.*' + }`; + } + if (extractionNamespace.uri) { + return `URI: ${extractionNamespace.uri}`; + } + return 'Unknown extractionNamespace lookup'; - case 'jdbc': { - const columns = [ - `${extractionNamespace.keyColumn} AS key`, - `${extractionNamespace.valueColumn} AS value`, - ]; - if (extractionNamespace.tsColumn) { - columns.push(`${extractionNamespace.tsColumn} AS ts`); + case 'jdbc': { + const columns = [ + `${extractionNamespace.keyColumn} AS key`, + `${extractionNamespace.valueColumn} AS value`, + ]; + if (extractionNamespace.tsColumn) { + columns.push(`${extractionNamespace.tsColumn} AS ts`); + } + const queryParts = ['SELECT', columns.join(', '), `FROM ${extractionNamespace.table}`]; + if (extractionNamespace.filter) { + queryParts.push(`WHERE ${extractionNamespace.filter}`); + } + return `${ + extractionNamespace.connectorConfig?.connectURI || 'No connectURI' + } [${queryParts.join(' ')}]`; } - const queryParts = ['SELECT', columns.join(', '), `FROM ${extractionNamespace.table}`]; - if (extractionNamespace.filter) { - queryParts.push(`WHERE ${extractionNamespace.filter}`); - } - return `${ - extractionNamespace.connectorConfig?.connectURI || 'No connectURI' - } [${queryParts.join(' ')}]`; + + default: + return `Unknown lookup extractionNamespace type ${extractionNamespace.type}`; } - } - } - return 'Unknown lookup'; + case 'kafka': { + const servers = kafkaProperties?.['bootstrap.servers']; + return `Topic: ${kafkaTopic}` + (servers ? ` (on: ${servers})` : ''); + } + + default: + return `Unknown lookup type ${type}`; + } } diff --git a/web-console/src/views/lookups-view/lookups-view.tsx b/web-console/src/views/lookups-view/lookups-view.tsx index 3f2650db6cc..8599f6d10f1 100644 --- a/web-console/src/views/lookups-view/lookups-view.tsx +++ b/web-console/src/views/lookups-view/lookups-view.tsx @@ -419,7 +419,9 @@ export class LookupsView extends React.PureComponent deepGet(row, 'spec.extractionNamespace.pollPeriod'), Cell: ({ original }) => { - if (original.spec.type === 'map') return 'Static map'; + const { type } = original.spec; + if (type === 'map') return 'Static map'; + if (type === 'kafka') return 'Kafka based'; const pollPeriod = deepGet(original, 'spec.extractionNamespace.pollPeriod'); if (!pollPeriod) { return (