support kafka lookups (#13098)

This commit is contained in:
Vadim Ogievetsky 2022-09-16 15:25:25 -07:00 committed by GitHub
parent 9b53b0184f
commit c62a822121
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 127 additions and 35 deletions

View File

@ -74,6 +74,7 @@ exports[`LookupEditDialog matches snapshot 1`] = `
"suggestions": Array [
"map",
"cachedNamespace",
"kafka",
],
"type": "string",
},
@ -398,6 +399,35 @@ exports[`LookupEditDialog matches snapshot 1`] = `
"name": "injective",
"type": "boolean",
},
Object {
"defined": [Function],
"info": "The Kafka topic to read the data from",
"name": "kafkaTopic",
"required": true,
"type": "string",
},
Object {
"defined": [Function],
"height": "100px",
"issueWithValue": [Function],
"name": "kafkaProperties",
"required": true,
"type": "json",
},
Object {
"defaultValue": 0,
"defined": [Function],
"info": "How long to wait for an initial connection",
"name": "connectTimeout",
"type": "number",
},
Object {
"defaultValue": false,
"defined": [Function],
"info": "If the underlying map is one-to-one (keys and values are unique) then optimizations can occur internally by setting this to true",
"name": "isOneToOne",
"type": "boolean",
},
]
}
model={

View File

@ -57,10 +57,20 @@ export interface NamespaceParseSpec {
export interface LookupSpec {
readonly type: string;
// type: map
readonly map?: Record<string, string | number>;
// type: cachedNamespace
readonly extractionNamespace?: ExtractionNamespaceSpec;
readonly firstCacheTimeout?: number;
readonly injective?: boolean;
// type: kafka
readonly kafkaTopic?: string;
readonly kafkaProperties?: Record<string, any>;
readonly connectTimeout?: number;
readonly isOneToOne?: boolean;
}
function issueWithUri(uri: string): string | undefined {
@ -83,7 +93,7 @@ export const LOOKUP_FIELDS: Field<LookupSpec>[] = [
{
name: 'type',
type: 'string',
suggestions: ['map', 'cachedNamespace'],
suggestions: ['map', 'cachedNamespace', 'kafka'],
required: true,
adjustment: l => {
if (l.type === 'map' && !l.map) {
@ -92,6 +102,9 @@ export const LOOKUP_FIELDS: Field<LookupSpec>[] = [
if (l.type === 'cachedNamespace' && !deepGet(l, 'extractionNamespace.type')) {
return deepSet(l, 'extractionNamespace', { type: 'uri', pollPeriod: 'PT1H' });
}
if (l.type === 'kafka' && !deepGet(l, 'kafkaProperties')) {
return deepSet(l, 'kafkaProperties', { 'bootstrap.servers': '' });
}
return l;
},
},
@ -396,6 +409,42 @@ export const LOOKUP_FIELDS: Field<LookupSpec>[] = [
defined: typeIs('cachedNamespace'),
info: `If the underlying map is injective (keys and values are unique) then optimizations can occur internally by setting this to true`,
},
// kafka lookups
{
name: 'kafkaTopic',
type: 'string',
defined: typeIs('kafka'),
required: true,
info: `The Kafka topic to read the data from`,
},
{
name: 'kafkaProperties',
type: 'json',
height: '100px',
defined: typeIs('kafka'),
required: true,
issueWithValue: value => {
if (!value) return 'kafkaProperties must be defined';
if (typeof value !== 'object') return `kafkaProperties must be an object`;
if (!value['bootstrap.servers']) return 'bootstrap.servers must be defined';
return;
},
},
{
name: 'connectTimeout',
type: 'number',
defaultValue: 0,
defined: typeIs('kafka'),
info: `How long to wait for an initial connection`,
},
{
name: 'isOneToOne',
type: 'boolean',
defaultValue: false,
defined: typeIs('kafka'),
info: `If the underlying map is one-to-one (keys and values are unique) then optimizations can occur internally by setting this to true`,
},
];
export function isLookupInvalid(
@ -410,43 +459,54 @@ export function isLookupInvalid(
}
export function lookupSpecSummary(spec: LookupSpec): string {
const { map, extractionNamespace } = spec;
const { type, map, extractionNamespace, kafkaTopic, kafkaProperties } = spec;
if (map) {
return pluralIfNeeded(Object.keys(map).length, 'key');
}
switch (type) {
case 'map':
if (!map) return 'No map';
return pluralIfNeeded(Object.keys(map).length, 'key');
if (extractionNamespace) {
switch (extractionNamespace.type) {
case 'uri':
if (extractionNamespace.uriPrefix) {
return `URI prefix: ${extractionNamespace.uriPrefix}, Match: ${
extractionNamespace.fileRegex || '.*'
}`;
}
if (extractionNamespace.uri) {
return `URI: ${extractionNamespace.uri}`;
}
return 'Unknown extractionNamespace lookup';
case 'cachedNamespace':
if (!extractionNamespace) return 'No extractionNamespace';
switch (extractionNamespace.type) {
case 'uri':
if (extractionNamespace.uriPrefix) {
return `URI prefix: ${extractionNamespace.uriPrefix}, Match: ${
extractionNamespace.fileRegex || '.*'
}`;
}
if (extractionNamespace.uri) {
return `URI: ${extractionNamespace.uri}`;
}
return 'Unknown extractionNamespace lookup';
case 'jdbc': {
const columns = [
`${extractionNamespace.keyColumn} AS key`,
`${extractionNamespace.valueColumn} AS value`,
];
if (extractionNamespace.tsColumn) {
columns.push(`${extractionNamespace.tsColumn} AS ts`);
case 'jdbc': {
const columns = [
`${extractionNamespace.keyColumn} AS key`,
`${extractionNamespace.valueColumn} AS value`,
];
if (extractionNamespace.tsColumn) {
columns.push(`${extractionNamespace.tsColumn} AS ts`);
}
const queryParts = ['SELECT', columns.join(', '), `FROM ${extractionNamespace.table}`];
if (extractionNamespace.filter) {
queryParts.push(`WHERE ${extractionNamespace.filter}`);
}
return `${
extractionNamespace.connectorConfig?.connectURI || 'No connectURI'
} [${queryParts.join(' ')}]`;
}
const queryParts = ['SELECT', columns.join(', '), `FROM ${extractionNamespace.table}`];
if (extractionNamespace.filter) {
queryParts.push(`WHERE ${extractionNamespace.filter}`);
}
return `${
extractionNamespace.connectorConfig?.connectURI || 'No connectURI'
} [${queryParts.join(' ')}]`;
default:
return `Unknown lookup extractionNamespace type ${extractionNamespace.type}`;
}
}
}
return 'Unknown lookup';
case 'kafka': {
const servers = kafkaProperties?.['bootstrap.servers'];
return `Topic: ${kafkaTopic}` + (servers ? ` (on: ${servers})` : '');
}
default:
return `Unknown lookup type ${type}`;
}
}

View File

@ -419,7 +419,9 @@ export class LookupsView extends React.PureComponent<LookupsViewProps, LookupsVi
className: 'padded',
accessor: row => deepGet(row, 'spec.extractionNamespace.pollPeriod'),
Cell: ({ original }) => {
if (original.spec.type === 'map') return 'Static map';
const { type } = original.spec;
if (type === 'map') return 'Static map';
if (type === 'kafka') return 'Kafka based';
const pollPeriod = deepGet(original, 'spec.extractionNamespace.pollPeriod');
if (!pollPeriod) {
return (