support kinesis input format (#16850)

This commit is contained in:
Vadim Ogievetsky 2024-08-07 10:24:28 -07:00 committed by GitHub
parent c6a7ab005f
commit 56c03582cf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 102 additions and 26 deletions

View File

@ -2441,11 +2441,12 @@ export function fillInputFormatIfNeeded(
sampleResponse: SampleResponse,
): Partial<IngestionSpec> {
if (deepGet(spec, 'spec.ioConfig.inputFormat.type')) return spec;
const specType = getSpecType(spec);
return deepSet(
spec,
'spec.ioConfig.inputFormat',
getSpecType(spec) === 'kafka'
specType === 'kafka'
? guessKafkaInputFormat(
filterMap(sampleResponse.data, l => l.input),
typeof deepGet(spec, 'spec.ioConfig.topicPattern') === 'string',

View File

@ -60,16 +60,29 @@ const KNOWN_TYPES = [
'avro_stream',
'protobuf',
'regex',
'kafka',
'javascript',
'kafka',
'kinesis',
];
function generateInputFormatFields(streaming: boolean) {
return compact([
{
name: 'type',
label: 'Input format',
type: 'string',
suggestions: KNOWN_TYPES,
suggestions: [
'json',
'csv',
'tsv',
'parquet',
'orc',
'avro_ocf',
'avro_stream',
'protobuf',
'regex',
'javascript',
],
required: true,
info: (
<>
@ -606,12 +619,35 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
},
];
export const KINESIS_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
{
name: 'timestampColumnName',
label: 'Kinesis timestamp column name',
type: 'string',
defaultValue: 'kinesis.timestamp',
defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
info: `The name of the column for the Kinesis timestamp.`,
},
{
name: 'partitionKeyColumnName',
label: 'Kinesis partition key column name',
type: 'string',
defaultValue: 'kinesis.partitionKey',
defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
info: `The name of the column for the Kinesis partition key. This field is useful when ingesting data from multiple partitions into the same datasource.`,
},
];
export function issueWithInputFormat(inputFormat: InputFormat | undefined): string | undefined {
return AutoForm.issueWithModel(inputFormat, BATCH_INPUT_FORMAT_FIELDS);
}
export function isKafkaOrKinesis(type: string | undefined): type is 'kafka' | 'kinesis' {
return type === 'kafka' || type === 'kinesis';
}
export function inputFormatCanProduceNestedData(inputFormat: InputFormat): boolean {
if (inputFormat.type === 'kafka') {
if (isKafkaOrKinesis(inputFormat.type)) {
return Boolean(
inputFormat.valueFormat && inputFormatCanProduceNestedData(inputFormat.valueFormat),
);

View File

@ -251,6 +251,11 @@ const KAFKA_SAMPLE_INPUT_FORMAT: InputFormat = {
valueFormat: WHOLE_ROW_INPUT_FORMAT,
};
const KINESIS_SAMPLE_INPUT_FORMAT: InputFormat = {
type: 'kinesis',
valueFormat: WHOLE_ROW_INPUT_FORMAT,
};
export async function sampleForConnect(
spec: Partial<IngestionSpec>,
sampleStrategy: SampleStrategy,
@ -267,7 +272,11 @@ export async function sampleForConnect(
ioConfig = deepSet(
ioConfig,
'inputFormat',
samplerType === 'kafka' ? KAFKA_SAMPLE_INPUT_FORMAT : WHOLE_ROW_INPUT_FORMAT,
samplerType === 'kafka'
? KAFKA_SAMPLE_INPUT_FORMAT
: samplerType === 'kinesis'
? KINESIS_SAMPLE_INPUT_FORMAT
: WHOLE_ROW_INPUT_FORMAT,
);
}

View File

@ -113,11 +113,13 @@ import {
invalidPartitionConfig,
isDruidSource,
isEmptyIngestionSpec,
isKafkaOrKinesis,
isStreamingSpec,
issueWithIoConfig,
issueWithSampleData,
joinFilter,
KAFKA_METADATA_INPUT_FORMAT_FIELDS,
KINESIS_METADATA_INPUT_FORMAT_FIELDS,
KNOWN_FILTER_TYPES,
MAX_INLINE_DATA_LENGTH,
METRIC_SPEC_FIELDS,
@ -244,30 +246,44 @@ function showKafkaLine(line: SampleEntry): string {
]).join('\n');
}
function showKinesisLine(line: SampleEntry): string {
const { input } = line;
if (!input) return 'Invalid kinesis row';
return compact([
`[ Kinesis timestamp: ${input['kinesis.timestamp']}`,
input['kinesis.partitionKey'] ? ` Partition key: ${input['kinesis.partitionKey']}` : undefined,
` Payload: ${input.raw}`,
']',
]).join('\n');
}
function showBlankLine(line: SampleEntry): string {
return line.parsed ? `[Row: ${JSONBig.stringify(line.parsed)}]` : '[Binary data]';
}
function formatSampleEntries(
sampleEntries: SampleEntry[],
druidSource: boolean,
kafkaSource: boolean,
specialSource: undefined | 'druid' | 'kafka' | 'kinesis',
): string {
if (!sampleEntries.length) return 'No data returned from sampler';
if (druidSource) {
return sampleEntries.map(showDruidLine).join('\n');
}
switch (specialSource) {
case 'druid':
return sampleEntries.map(showDruidLine).join('\n');
if (kafkaSource) {
return sampleEntries.map(showKafkaLine).join('\n');
}
case 'kafka':
return sampleEntries.map(showKafkaLine).join('\n');
return (
sampleEntries.every(l => !l.parsed)
? sampleEntries.map(showBlankLine)
: sampleEntries.map(showRawLine)
).join('\n');
case 'kinesis':
return sampleEntries.map(showKinesisLine).join('\n');
default:
return (
sampleEntries.every(l => !l.parsed)
? sampleEntries.map(showBlankLine)
: sampleEntries.map(showRawLine)
).join('\n');
}
}
function getTimestampSpec(sampleResponse: SampleResponse | null): TimestampSpec {
@ -1239,10 +1255,11 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
renderConnectStep() {
const { inputQueryState, sampleStrategy } = this.state;
const spec = this.getEffectiveSpec();
const specType = getSpecType(spec);
const ioConfig: IoConfig = deepGet(spec, 'spec.ioConfig') || EMPTY_OBJECT;
const inlineMode = deepGet(spec, 'spec.ioConfig.inputSource.type') === 'inline';
const druidSource = isDruidSource(spec);
const kafkaSource = getSpecType(spec) === 'kafka';
const specialSource = druidSource ? 'druid' : isKafkaOrKinesis(specType) ? specType : undefined;
let mainFill: JSX.Element | string;
if (inlineMode) {
@ -1274,7 +1291,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
<TextArea
className="raw-lines"
readOnly
value={formatSampleEntries(inputData, druidSource, kafkaSource)}
value={formatSampleEntries(inputData, specialSource)}
/>
)}
{inputQueryState.isLoading() && <Loader />}
@ -1544,11 +1561,11 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
<ParserMessage />
{!selectedFlattenField && (
<>
{specType !== 'kafka' ? (
{!isKafkaOrKinesis(specType) ? (
normalInputAutoForm
) : (
<>
{inputFormat?.type !== 'kafka' ? (
{!isKafkaOrKinesis(inputFormat?.type) ? (
normalInputAutoForm
) : (
<AutoForm
@ -1563,18 +1580,22 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
)}
<FormGroup className="parse-metadata">
<Switch
label="Parse Kafka metadata (ts, headers, key)"
checked={inputFormat?.type === 'kafka'}
label={
specType === 'kafka'
? 'Parse Kafka metadata (ts, headers, key)'
: 'Parse Kinesis metadata (ts, partition key)'
}
checked={isKafkaOrKinesis(inputFormat?.type)}
onChange={() => {
this.updateSpecPreview(
inputFormat?.type === 'kafka'
isKafkaOrKinesis(inputFormat?.type)
? deepMove(
spec,
'spec.ioConfig.inputFormat.valueFormat',
'spec.ioConfig.inputFormat',
)
: deepSet(spec, 'spec.ioConfig.inputFormat', {
type: 'kafka',
type: specType,
valueFormat: inputFormat,
}),
);
@ -1590,6 +1611,15 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
}
/>
)}
{inputFormat?.type === 'kinesis' && (
<AutoForm
fields={KINESIS_METADATA_INPUT_FORMAT_FIELDS}
model={inputFormat}
onChange={p =>
this.updateSpecPreview(deepSet(spec, 'spec.ioConfig.inputFormat', p))
}
/>
)}
</>
)}
{possibleSystemFields.length > 0 && (