Web console: Add input format props (#15950)

* fix typo

* add Protobuf

* better padding
This commit is contained in:
Vadim Ogievetsky 2024-02-26 11:28:09 -08:00 committed by GitHub
parent 67a6224d91
commit 28b3e117cf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 143 additions and 22 deletions

View File

@ -22,7 +22,7 @@ import React from 'react';
import type { Field } from '../../components'; import type { Field } from '../../components';
import { AutoForm, ExternalLink } from '../../components'; import { AutoForm, ExternalLink } from '../../components';
import { getLink } from '../../links'; import { getLink } from '../../links';
import { compact, deepGet, deepSet, oneOf, typeIsKnown } from '../../utils'; import { compact, deepGet, deepSet, oneOf, oneOfKnown, typeIsKnown } from '../../utils';
import type { FlattenSpec } from '../flatten-spec/flatten-spec'; import type { FlattenSpec } from '../flatten-spec/flatten-spec';
export interface InputFormat { export interface InputFormat {
@ -58,6 +58,7 @@ const KNOWN_TYPES = [
'orc', 'orc',
'avro_ocf', 'avro_ocf',
'avro_stream', 'avro_stream',
'protobuf',
'regex', 'regex',
'kafka', 'kafka',
'javascript', 'javascript',
@ -230,6 +231,44 @@ function generateInputFormatFields(streaming: boolean) {
defined: typeIsKnown(KNOWN_TYPES, 'csv', 'tsv', 'regex'), defined: typeIsKnown(KNOWN_TYPES, 'csv', 'tsv', 'regex'),
info: <>A custom delimiter for multi-value dimensions.</>, info: <>A custom delimiter for multi-value dimensions.</>,
}, },
{
name: 'avroBytesDecoder',
type: 'json',
defined: typeIsKnown(KNOWN_TYPES, 'avro_stream'),
required: true,
placeholder: `{ type: "schema_repo", ... }`,
info: (
<>
<p>Specifies how to decode bytes to Avro record.</p>
<p>
For more details refer to the{' '}
<ExternalLink href={`${getLink('DOCS')}/ingestion/data-formats/#avro-bytes-decoder`}>
documentation
</ExternalLink>
.
</p>
</>
),
},
{
name: 'schema',
type: 'json',
defined: typeIsKnown(KNOWN_TYPES, 'avro_ocf'),
info: (
<>
Define a reader schema to be used when parsing Avro records. This is useful when parsing
multiple versions of Avro OCF file data.
</>
),
},
{
name: 'protoBytesDecoder',
type: 'json',
defined: typeIsKnown(KNOWN_TYPES, 'protobuf'),
required: true,
placeholder: `{ ... }`,
info: <>Specifies how to decode bytes to Protobuf record.</>,
},
{ {
name: 'binaryAsString', name: 'binaryAsString',
type: 'boolean', type: 'boolean',
@ -320,7 +359,7 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
name: 'keyFormat.featureSpec', name: 'keyFormat.featureSpec',
label: 'Kafka key JSON parser features', label: 'Kafka key JSON parser features',
type: 'json', type: 'json',
defined: inputFormat => deepGet(inputFormat, 'keyFormat.type') === 'json', defined: inputFormat => oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'json'),
hideInMore: true, hideInMore: true,
info: ( info: (
<> <>
@ -342,7 +381,7 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
name: 'keyFormat.assumeNewlineDelimited', name: 'keyFormat.assumeNewlineDelimited',
label: 'Kafka key assume newline delimited', label: 'Kafka key assume newline delimited',
type: 'boolean', type: 'boolean',
defined: inputFormat => deepGet(inputFormat, 'keyFormat.type') === 'json', defined: inputFormat => oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'json'),
disabled: inputFormat => Boolean(inputFormat.useJsonNodeReader), disabled: inputFormat => Boolean(inputFormat.useJsonNodeReader),
defaultValue: false, defaultValue: false,
hideInMore: true, hideInMore: true,
@ -370,7 +409,7 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
name: 'keyFormat.useJsonNodeReader', name: 'keyFormat.useJsonNodeReader',
label: 'Kafka key use JSON node reader', label: 'Kafka key use JSON node reader',
type: 'boolean', type: 'boolean',
defined: inputFormat => deepGet(inputFormat, 'keyFormat.type') === 'json', defined: inputFormat => oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'json'),
disabled: inputFormat => Boolean(inputFormat.assumeNewlineDelimited), disabled: inputFormat => Boolean(inputFormat.assumeNewlineDelimited),
defaultValue: false, defaultValue: false,
hideInMore: true, hideInMore: true,
@ -400,14 +439,15 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
type: 'string', type: 'string',
defaultValue: '\t', defaultValue: '\t',
suggestions: ['\t', ';', '|', '#'], suggestions: ['\t', ';', '|', '#'],
defined: inputFormat => deepGet(inputFormat, 'keyFormat.type') === 'tsv', defined: inputFormat => oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'tsv'),
info: <>A custom delimiter for data values.</>, info: <>A custom delimiter for data values.</>,
}, },
{ {
name: 'keyFormat.pattern', name: 'keyFormat.pattern',
label: 'Kafka key pattern', label: 'Kafka key pattern',
type: 'string', type: 'string',
defined: inputFormat => deepGet(inputFormat, 'keyFormat.type') === 'regex', defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'regex'),
required: true, required: true,
}, },
{ {
@ -415,7 +455,8 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
label: 'Kafka key skip header rows', label: 'Kafka key skip header rows',
type: 'number', type: 'number',
defaultValue: 0, defaultValue: 0,
defined: inputFormat => oneOf(deepGet(inputFormat, 'keyFormat.type'), 'csv', 'tsv'), defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'csv', 'tsv'),
min: 0, min: 0,
info: ( info: (
<> <>
@ -427,7 +468,8 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
name: 'keyFormat.findColumnsFromHeader', name: 'keyFormat.findColumnsFromHeader',
label: 'Kafka key find columns from header', label: 'Kafka key find columns from header',
type: 'boolean', type: 'boolean',
defined: inputFormat => oneOf(deepGet(inputFormat, 'keyFormat.type'), 'csv', 'tsv'), defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'csv', 'tsv'),
required: true, required: true,
hideInMore: true, hideInMore: true,
info: ( info: (
@ -463,12 +505,57 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
type: 'string', type: 'string',
defaultValue: '\x01', defaultValue: '\x01',
suggestions: ['\x01', '\x00'], suggestions: ['\x01', '\x00'],
defined: inputFormat => oneOf(deepGet(inputFormat, 'keyFormat.type'), 'csv', 'tsv', 'regex'), defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'csv', 'tsv', 'regex'),
info: <>A custom delimiter for multi-value dimensions.</>, info: <>A custom delimiter for multi-value dimensions.</>,
}, },
{
name: 'keyFormat.avroBytesDecoder',
label: 'Kafka key Avro bytes decoder',
type: 'json',
defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'avro_stream'),
required: true,
placeholder: `{ type: "schema_repo", ... }`,
info: (
<>
<p>Specifies how to decode bytes to Avro record.</p>
<p>
For more details refer to the{' '}
<ExternalLink href={`${getLink('DOCS')}/ingestion/data-formats/#avro-bytes-decoder`}>
documentation
</ExternalLink>
.
</p>
</>
),
},
{
name: 'keyFormat.schema',
label: 'Key format schema',
type: 'json',
defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'avro_ocf'),
info: (
<>
Define a reader schema to be used when parsing Avro records. This is useful when parsing
multiple versions of Avro OCF file data.
</>
),
},
{
name: 'keyFormat.protoBytesDecoder',
label: 'Kafka key proto bytes decoder',
type: 'json',
defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'protobuf'),
required: true,
placeholder: `{ ... }`,
info: <>Specifies how to decode bytes to Protobuf record.</>,
},
{ {
name: 'keyFormat.binaryAsString', name: 'keyFormat.binaryAsString',
label: 'Kafka key list binary as string', label: 'Kafka key binary as string',
type: 'boolean', type: 'boolean',
defaultValue: false, defaultValue: false,
defined: inputFormat => defined: inputFormat =>
@ -498,7 +585,7 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
label: 'Kafka header format type', label: 'Kafka header format type',
type: 'string', type: 'string',
defined: typeIsKnown(KNOWN_TYPES, 'kafka'), defined: typeIsKnown(KNOWN_TYPES, 'kafka'),
placeholder: `(don't parse Kafka herders)`, placeholder: `(don't parse Kafka headers)`,
suggestions: [undefined, 'string'], suggestions: [undefined, 'string'],
}, },
{ {
@ -529,5 +616,5 @@ export function inputFormatCanProduceNestedData(inputFormat: InputFormat): boole
inputFormat.valueFormat && inputFormatCanProduceNestedData(inputFormat.valueFormat), inputFormat.valueFormat && inputFormatCanProduceNestedData(inputFormat.valueFormat),
); );
} }
return oneOf(inputFormat.type, 'json', 'parquet', 'orc', 'avro_ocf', 'avro_stream'); return oneOf(inputFormat.type, 'json', 'parquet', 'orc', 'avro_ocf', 'avro_stream', 'protobuf');
} }

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
.destination-pages-pane {
.download-button {
margin-top: 4px;
margin-left: 2px;
}
}

View File

@ -35,6 +35,8 @@ import {
wait, wait,
} from '../../../utils'; } from '../../../utils';
import './destination-pages-pane.scss';
type ResultFormat = 'object' | 'array' | 'objectLines' | 'arrayLines' | 'csv'; type ResultFormat = 'object' | 'array' | 'objectLines' | 'arrayLines' | 'csv';
const RESULT_FORMATS: ResultFormat[] = ['objectLines', 'object', 'arrayLines', 'array', 'csv']; const RESULT_FORMATS: ResultFormat[] = ['objectLines', 'object', 'arrayLines', 'array', 'csv'];
@ -86,24 +88,28 @@ export const DestinationPagesPane = React.memo(function DestinationPagesPane(
); );
} }
function getPageFilename(pageIndex: number) { function getPageFilename(pageIndex: number, numPages: number) {
return `${id}_page${pageIndex}.${desiredExtension}`; const numPagesString = String(numPages);
const pageNumberString = String(pageIndex + 1).padStart(numPagesString.length, '0');
return `${id}_page_${pageNumberString}_of_${numPagesString}.${desiredExtension}`;
} }
async function downloadAllPages() { async function downloadAllPages() {
if (!pages) return; if (!pages) return;
const numPages = pages.length;
for (let i = 0; i < pages.length; i++) { for (let i = 0; i < pages.length; i++) {
downloadUrl(getPageUrl(i), getPageFilename(i)); downloadUrl(getPageUrl(i), getPageFilename(i, numPages));
await wait(100); await wait(100);
} }
} }
const numPages = pages.length;
return ( return (
<div className="execution-details-pane"> <div className="destination-pages-pane">
<p> <p>
{`${ {`${
typeof numTotalRows === 'number' ? pluralIfNeeded(numTotalRows, 'row') : 'Results' typeof numTotalRows === 'number' ? pluralIfNeeded(numTotalRows, 'row') : 'Results'
} have been written to ${pluralIfNeeded(pages.length, 'page')}. `} } have been written to ${pluralIfNeeded(numPages, 'page')}. `}
</p> </p>
<p> <p>
Format when downloading:{' '} Format when downloading:{' '}
@ -133,7 +139,7 @@ export const DestinationPagesPane = React.memo(function DestinationPagesPane(
<Button <Button
intent={Intent.PRIMARY} intent={Intent.PRIMARY}
icon={IconNames.DOWNLOAD} icon={IconNames.DOWNLOAD}
text={`Download all data (${pluralIfNeeded(pages.length, 'file')})`} text={`Download all data (${pluralIfNeeded(numPages, 'file')})`}
onClick={() => void downloadAllPages()} onClick={() => void downloadAllPages()}
/> />
)} )}
@ -142,11 +148,11 @@ export const DestinationPagesPane = React.memo(function DestinationPagesPane(
data={pages} data={pages}
loading={false} loading={false}
sortable={false} sortable={false}
defaultPageSize={clamp(pages.length, 1, SMALL_TABLE_PAGE_SIZE)} defaultPageSize={clamp(numPages, 1, SMALL_TABLE_PAGE_SIZE)}
showPagination={pages.length > SMALL_TABLE_PAGE_SIZE} showPagination={numPages > SMALL_TABLE_PAGE_SIZE}
columns={[ columns={[
{ {
Header: 'Page number', Header: 'Page ID',
id: 'id', id: 'id',
accessor: 'id', accessor: 'id',
className: 'padded', className: 'padded',
@ -175,11 +181,12 @@ export const DestinationPagesPane = React.memo(function DestinationPagesPane(
width: 300, width: 300,
Cell: ({ value }) => ( Cell: ({ value }) => (
<AnchorButton <AnchorButton
className="download-button"
icon={IconNames.DOWNLOAD} icon={IconNames.DOWNLOAD}
text="Download" text="Download"
minimal minimal
href={getPageUrl(value)} href={getPageUrl(value)}
download={getPageFilename(value)} download={getPageFilename(value, numPages)}
/> />
), ),
}, },

View File

@ -289,6 +289,9 @@ export const InputSourceStep = React.memo(function InputSourceStep(props: InputS
<li> <li>
<ExternalLink href="https://avro.apache.org">Avro</ExternalLink> <ExternalLink href="https://avro.apache.org">Avro</ExternalLink>
</li> </li>
<li>
<ExternalLink href="https://protobuf.dev">Protobuf</ExternalLink>
</li>
<li> <li>
Any line format that can be parsed with a custom regular expression (regex) Any line format that can be parsed with a custom regular expression (regex)
</li> </li>