Web console: streaming json input format specifics (#13381)

* streaming json input format specifics

* goodies
This commit is contained in:
Vadim Ogievetsky 2022-11-18 14:15:16 -08:00 committed by GitHub
parent a3d45f6086
commit c628947c31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 257 additions and 190 deletions

View File

@ -19,7 +19,7 @@ exports[`ShowJson matches snapshot 1`] = `
<span <span
class="bp4-button-text" class="bp4-button-text"
> >
Save Download
</span> </span>
</button> </button>
<button <button

View File

@ -56,7 +56,7 @@ export const ShowJson = React.memo(function ShowJson(props: ShowJsonProps) {
{downloadFilename && ( {downloadFilename && (
<Button <Button
disabled={jsonState.loading} disabled={jsonState.loading}
text="Save" text="Download"
minimal minimal
onClick={() => downloadFile(jsonValue, 'json', downloadFilename)} onClick={() => downloadFile(jsonValue, 'json', downloadFilename)}
/> />

View File

@ -32,7 +32,7 @@ exports[`ShowLog describe show log 1`] = `
<span <span
class="bp4-button-text" class="bp4-button-text"
> >
Save Download
</span> </span>
</a> </a>
<button <button

View File

@ -155,7 +155,7 @@ export class ShowLog extends React.PureComponent<ShowLogProps, ShowLogState> {
<ButtonGroup className="right-buttons"> <ButtonGroup className="right-buttons">
{downloadFilename && ( {downloadFilename && (
<AnchorButton <AnchorButton
text="Save" text="Download"
minimal minimal
download={downloadFilename} download={downloadFilename}
href={UrlBaser.base(endpoint)} href={UrlBaser.base(endpoint)}

View File

@ -17,7 +17,7 @@ exports[`ShowValue matches snapshot 1`] = `
<span <span
class="bp4-button-text" class="bp4-button-text"
> >
Save Download
</span> </span>
</button> </button>
</div> </div>

View File

@ -41,7 +41,7 @@ export const ShowValue = React.memo(function ShowValue(props: ShowValueProps) {
)} )}
{downloadFilename && ( {downloadFilename && (
<Button <Button
text="Save" text="Download"
minimal minimal
onClick={() => downloadFile(jsonValue, 'json', downloadFilename)} onClick={() => downloadFile(jsonValue, 'json', downloadFilename)}
/> />

View File

@ -138,7 +138,7 @@ exports[`SegmentTableActionDialog matches snapshot 1`] = `
<span <span
class="bp4-button-text" class="bp4-button-text"
> >
Save Download
</span> </span>
</button> </button>
<button <button

View File

@ -192,7 +192,7 @@ exports[`SupervisorTableActionDialog matches snapshot 1`] = `
<span <span
class="bp4-button-text" class="bp4-button-text"
> >
Save Download
</span> </span>
</button> </button>
<button <button

View File

@ -192,7 +192,7 @@ exports[`TaskTableActionDialog matches snapshot 1`] = `
<span <span
class="bp4-button-text" class="bp4-button-text"
> >
Save Download
</span> </span>
</button> </button>
<button <button

View File

@ -285,12 +285,8 @@ export function getSpecType(spec: Partial<IngestionSpec>): IngestionType {
); );
} }
export function isTask(spec: Partial<IngestionSpec>) { export function isStreamingSpec(spec: Partial<IngestionSpec>): boolean {
const type = String(getSpecType(spec)); return oneOf(getSpecType(spec), 'kafka', 'kinesis');
return (
type.startsWith('index_') ||
oneOf(type, 'index', 'compact', 'kill', 'append', 'merge', 'same_interval_merge')
);
} }
export function isDruidSource(spec: Partial<IngestionSpec>): boolean { export function isDruidSource(spec: Partial<IngestionSpec>): boolean {

View File

@ -21,7 +21,7 @@ import React from 'react';
import { AutoForm, ExternalLink, Field } from '../../components'; import { AutoForm, ExternalLink, Field } from '../../components';
import { getLink } from '../../links'; import { getLink } from '../../links';
import { oneOf, typeIs } from '../../utils'; import { compact, oneOf, typeIs } from '../../utils';
import { FlattenSpec } from '../flatten-spec/flatten-spec'; import { FlattenSpec } from '../flatten-spec/flatten-spec';
export interface InputFormat { export interface InputFormat {
@ -36,9 +36,12 @@ export interface InputFormat {
readonly flattenSpec?: FlattenSpec | null; readonly flattenSpec?: FlattenSpec | null;
readonly featureSpec?: Record<string, boolean>; readonly featureSpec?: Record<string, boolean>;
readonly keepNullColumns?: boolean; readonly keepNullColumns?: boolean;
readonly assumeNewlineDelimited?: boolean;
readonly useJsonNodeReader?: boolean;
} }
export const INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [ function generateInputFormatFields(streaming: boolean) {
return compact([
{ {
name: 'type', name: 'type',
label: 'Input format', label: 'Input format',
@ -79,6 +82,63 @@ export const INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
</> </>
), ),
}, },
streaming
? {
name: 'assumeNewlineDelimited',
type: 'boolean',
defined: typeIs('json'),
disabled: (inputFormat: InputFormat) => inputFormat.useJsonNodeReader,
defaultValue: false,
info: (
<>
<p>
In streaming ingestion, multi-line JSON events can be ingested (i.e. where a single
JSON event spans multiple lines). However, if a parsing exception occurs, all JSON
events that are present in the same streaming record will be discarded.
</p>
<p>
<Code>assumeNewlineDelimited</Code> and <Code>useJsonNodeReader</Code> (at most one
can be <Code>true</Code>) affect only how parsing exceptions are handled.
</p>
<p>
If the input is known to be newline delimited JSON (each individual JSON event is
contained in a single line, separated by newlines), setting this option to true
allows for more flexible parsing exception handling. Only the lines with invalid
JSON syntax will be discarded, while lines containing valid JSON events will still
be ingested.
</p>
</>
),
}
: undefined,
streaming
? {
name: 'useJsonNodeReader',
type: 'boolean',
defined: typeIs('json'),
disabled: (inputFormat: InputFormat) => inputFormat.assumeNewlineDelimited,
defaultValue: false,
info: (
<>
{' '}
<p>
In streaming ingestion, multi-line JSON events can be ingested (i.e. where a single
JSON event spans multiple lines). However, if a parsing exception occurs, all JSON
events that are present in the same streaming record will be discarded.
</p>
<p>
<Code>assumeNewlineDelimited</Code> and <Code>useJsonNodeReader</Code> (at most one
can be <Code>true</Code>) affect only how parsing exceptions are handled.
</p>
<p>
When ingesting multi-line JSON events, enabling this option will enable the use of a
JSON parser which will retain any valid JSON events encountered within a streaming
record prior to when a parsing exception occurred.
</p>
</>
),
}
: undefined,
{ {
name: 'delimiter', name: 'delimiter',
type: 'string', type: 'string',
@ -119,10 +179,10 @@ export const INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
info: ( info: (
<> <>
If this is set, find the column names from the header row. Note that If this is set, find the column names from the header row. Note that
<Code>skipHeaderRows</Code> will be applied before finding column names from the header. For <Code>skipHeaderRows</Code> will be applied before finding column names from the header.
example, if you set <Code>skipHeaderRows</Code> to 2 and <Code>findColumnsFromHeader</Code>{' '} For example, if you set <Code>skipHeaderRows</Code> to 2 and{' '}
to true, the task will skip the first two lines and then extract column information from the <Code>findColumnsFromHeader</Code> to true, the task will skip the first two lines and
third line. then extract column information from the third line.
</> </>
), ),
}, },
@ -134,8 +194,8 @@ export const INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
(oneOf(p.type, 'csv', 'tsv') && p.findColumnsFromHeader === false) || p.type === 'regex', (oneOf(p.type, 'csv', 'tsv') && p.findColumnsFromHeader === false) || p.type === 'regex',
info: ( info: (
<> <>
Specifies the columns of the data. The columns should be in the same order with the columns Specifies the columns of the data. The columns should be in the same order with the
of your data. columns of your data.
</> </>
), ),
}, },
@ -159,13 +219,17 @@ export const INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
</> </>
), ),
}, },
]; ] as (Field<InputFormat> | undefined)[]);
}
export const INPUT_FORMAT_FIELDS: Field<InputFormat>[] = generateInputFormatFields(false);
export const STREAMING_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = generateInputFormatFields(true);
export function issueWithInputFormat(inputFormat: InputFormat | undefined): string | undefined { export function issueWithInputFormat(inputFormat: InputFormat | undefined): string | undefined {
return AutoForm.issueWithModel(inputFormat, INPUT_FORMAT_FIELDS); return AutoForm.issueWithModel(inputFormat, INPUT_FORMAT_FIELDS);
} }
export const inputFormatCanFlatten: (inputFormat: InputFormat) => boolean = typeIs( export const inputFormatCanProduceNestedData: (inputFormat: InputFormat) => boolean = typeIs(
'json', 'json',
'parquet', 'parquet',
'orc', 'orc',

View File

@ -57,30 +57,36 @@ export const ConnectMessage = React.memo(function ConnectMessage(props: ConnectM
}); });
export interface ParserMessageProps { export interface ParserMessageProps {
canFlatten: boolean; canHaveNestedData: boolean;
} }
export const ParserMessage = React.memo(function ParserMessage(props: ParserMessageProps) { export const ParserMessage = React.memo(function ParserMessage(props: ParserMessageProps) {
const { canFlatten } = props; const { canHaveNestedData } = props;
return ( return (
<FormGroup> <FormGroup>
<Callout> <Callout>
<p> <p>
You can{' '} Druid needs to parse data as columns. Determine the format of your data and ensure that
<ExternalLink href={`${getLink('DOCS')}/querying/nested-columns.html`}> the columns are accurately parsed.
directly ingest nested data
</ExternalLink>{' '}
into COMPLEX&lt;json&gt; columns.
</p> </p>
{canFlatten && ( {canHaveNestedData && (
<>
<p> <p>
If you have nested data, you can{' '} If you have nested data, you can ingest it into{' '}
<ExternalLink href={`${getLink('DOCS')}/querying/nested-columns.html`}>
COMPLEX&lt;json&gt;
</ExternalLink>{' '}
columns.
</p>
<p>
Alternatively, you can explicitly{' '}
<ExternalLink href={`${getLink('DOCS')}/ingestion/index.html#flattenspec`}> <ExternalLink href={`${getLink('DOCS')}/ingestion/index.html#flattenspec`}>
flatten flatten
</ExternalLink>{' '} </ExternalLink>{' '}
it here. it here.
</p> </p>
</>
)} )}
<LearnMore href={`${getLink('DOCS')}/ingestion/data-formats.html`} /> <LearnMore href={`${getLink('DOCS')}/ingestion/data-formats.html`} />
</Callout> </Callout>

View File

@ -85,7 +85,6 @@ import {
getRequiredModule, getRequiredModule,
getRollup, getRollup,
getSecondaryPartitionRelatedFormFields, getSecondaryPartitionRelatedFormFields,
getSpecType,
getTimestampExpressionFields, getTimestampExpressionFields,
getTimestampSchema, getTimestampSchema,
getTuningFormFields, getTuningFormFields,
@ -93,15 +92,15 @@ import {
IngestionSpec, IngestionSpec,
INPUT_FORMAT_FIELDS, INPUT_FORMAT_FIELDS,
InputFormat, InputFormat,
inputFormatCanFlatten, inputFormatCanProduceNestedData,
invalidIoConfig, invalidIoConfig,
invalidPartitionConfig, invalidPartitionConfig,
IoConfig, IoConfig,
isDruidSource, isDruidSource,
isEmptyIngestionSpec, isEmptyIngestionSpec,
isStreamingSpec,
issueWithIoConfig, issueWithIoConfig,
issueWithSampleData, issueWithSampleData,
isTask,
joinFilter, joinFilter,
KNOWN_FILTER_TYPES, KNOWN_FILTER_TYPES,
MAX_INLINE_DATA_LENGTH, MAX_INLINE_DATA_LENGTH,
@ -113,6 +112,7 @@ import {
PRIMARY_PARTITION_RELATED_FORM_FIELDS, PRIMARY_PARTITION_RELATED_FORM_FIELDS,
removeTimestampTransform, removeTimestampTransform,
splitFilter, splitFilter,
STREAMING_INPUT_FORMAT_FIELDS,
TIME_COLUMN, TIME_COLUMN,
TIMESTAMP_SPEC_FIELDS, TIMESTAMP_SPEC_FIELDS,
TimestampSpec, TimestampSpec,
@ -140,7 +140,6 @@ import {
localStorageSetJson, localStorageSetJson,
moveElement, moveElement,
moveToIndex, moveToIndex,
oneOf,
pluralIfNeeded, pluralIfNeeded,
QueryState, QueryState,
} from '../../utils'; } from '../../utils';
@ -1205,7 +1204,6 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
renderConnectStep() { renderConnectStep() {
const { inputQueryState, sampleStrategy } = this.state; const { inputQueryState, sampleStrategy } = this.state;
const spec = this.getEffectiveSpec(); const spec = this.getEffectiveSpec();
const specType = getSpecType(spec);
const ioConfig: IoConfig = deepGet(spec, 'spec.ioConfig') || EMPTY_OBJECT; const ioConfig: IoConfig = deepGet(spec, 'spec.ioConfig') || EMPTY_OBJECT;
const inlineMode = deepGet(spec, 'spec.ioConfig.inputSource.type') === 'inline'; const inlineMode = deepGet(spec, 'spec.ioConfig.inputSource.type') === 'inline';
const druidSource = isDruidSource(spec); const druidSource = isDruidSource(spec);
@ -1294,7 +1292,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
</Callout> </Callout>
</FormGroup> </FormGroup>
)} )}
{oneOf(specType, 'kafka', 'kinesis') && ( {isStreamingSpec(spec) && (
<FormGroup label="Where should the data be sampled from?"> <FormGroup label="Where should the data be sampled from?">
<RadioGroup <RadioGroup
selectedValue={sampleStrategy} selectedValue={sampleStrategy}
@ -1441,7 +1439,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
const flattenFields: FlattenField[] = const flattenFields: FlattenField[] =
deepGet(spec, 'spec.ioConfig.inputFormat.flattenSpec.fields') || EMPTY_ARRAY; deepGet(spec, 'spec.ioConfig.inputFormat.flattenSpec.fields') || EMPTY_ARRAY;
const canFlatten = inputFormatCanFlatten(inputFormat); const canHaveNestedData = inputFormatCanProduceNestedData(inputFormat);
let mainFill: JSX.Element | string; let mainFill: JSX.Element | string;
if (parserQueryState.isInit()) { if (parserQueryState.isInit()) {
@ -1460,7 +1458,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
onChange={columnFilter => this.setState({ columnFilter })} onChange={columnFilter => this.setState({ columnFilter })}
placeholder="Search columns" placeholder="Search columns"
/> />
{canFlatten && ( {canHaveNestedData && (
<Switch <Switch
checked={specialColumnsOnly} checked={specialColumnsOnly}
label="Flattened columns only" label="Flattened columns only"
@ -1473,7 +1471,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
<ParseDataTable <ParseDataTable
sampleData={data} sampleData={data}
columnFilter={columnFilter} columnFilter={columnFilter}
canFlatten={canFlatten} canFlatten={canHaveNestedData}
flattenedColumnsOnly={specialColumnsOnly} flattenedColumnsOnly={specialColumnsOnly}
flattenFields={flattenFields} flattenFields={flattenFields}
onFlattenFieldSelect={this.onFlattenFieldSelect} onFlattenFieldSelect={this.onFlattenFieldSelect}
@ -1488,22 +1486,25 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
} }
let suggestedFlattenFields: FlattenField[] | undefined; let suggestedFlattenFields: FlattenField[] | undefined;
if (canFlatten && !flattenFields.length && parserQueryState.data) { if (canHaveNestedData && !flattenFields.length && parserQueryState.data) {
suggestedFlattenFields = computeFlattenPathsForData( suggestedFlattenFields = computeFlattenPathsForData(
filterMap(parserQueryState.data.rows, r => r.input), filterMap(parserQueryState.data.rows, r => r.input),
'ignore-arrays', 'ignore-arrays',
); );
} }
const inputFormatFields = isStreamingSpec(spec)
? STREAMING_INPUT_FORMAT_FIELDS
: INPUT_FORMAT_FIELDS;
return ( return (
<> <>
<div className="main">{mainFill}</div> <div className="main">{mainFill}</div>
<div className="control"> <div className="control">
<ParserMessage canFlatten={canFlatten} /> <ParserMessage canHaveNestedData={canHaveNestedData} />
{!selectedFlattenField && ( {!selectedFlattenField && (
<> <>
<AutoForm <AutoForm
fields={INPUT_FORMAT_FIELDS} fields={inputFormatFields}
model={inputFormat} model={inputFormat}
onChange={p => onChange={p =>
this.updateSpecPreview(deepSet(spec, 'spec.ioConfig.inputFormat', p)) this.updateSpecPreview(deepSet(spec, 'spec.ioConfig.inputFormat', p))
@ -1511,11 +1512,11 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
/> />
{this.renderApplyButtonBar( {this.renderApplyButtonBar(
parserQueryState, parserQueryState,
AutoForm.issueWithModel(inputFormat, INPUT_FORMAT_FIELDS), AutoForm.issueWithModel(inputFormat, inputFormatFields),
)} )}
</> </>
)} )}
{canFlatten && this.renderFlattenControls()} {canHaveNestedData && this.renderFlattenControls()}
{suggestedFlattenFields && suggestedFlattenFields.length ? ( {suggestedFlattenFields && suggestedFlattenFields.length ? (
<FormGroup> <FormGroup>
<Button <Button
@ -1563,7 +1564,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
private readonly onFlattenFieldSelect = (field: FlattenField, index: number) => { private readonly onFlattenFieldSelect = (field: FlattenField, index: number) => {
const { spec, unsavedChange } = this.state; const { spec, unsavedChange } = this.state;
const inputFormat: InputFormat = deepGet(spec, 'spec.ioConfig.inputFormat') || EMPTY_OBJECT; const inputFormat: InputFormat = deepGet(spec, 'spec.ioConfig.inputFormat') || EMPTY_OBJECT;
if (unsavedChange || !inputFormatCanFlatten(inputFormat)) return; if (unsavedChange || !inputFormatCanProduceNestedData(inputFormat)) return;
this.setState({ this.setState({
selectedFlattenField: { value: field, index }, selectedFlattenField: { value: field, index },
@ -3265,7 +3266,27 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
if (submitting) return; if (submitting) return;
this.setState({ submitting: true }); this.setState({ submitting: true });
if (isTask(spec)) { if (isStreamingSpec(spec)) {
try {
await Api.instance.post('/druid/indexer/v1/supervisor', spec);
} catch (e) {
AppToaster.show({
message: `Failed to submit supervisor: ${getDruidErrorMessage(e)}`,
intent: Intent.DANGER,
});
this.setState({ submitting: false });
return;
}
AppToaster.show({
message: 'Supervisor submitted successfully. Going to task view...',
intent: Intent.SUCCESS,
});
setTimeout(() => {
goToIngestion(undefined); // Can we get the supervisor ID here?
}, 1000);
} else {
let taskResp: any; let taskResp: any;
try { try {
taskResp = await Api.instance.post('/druid/indexer/v1/task', spec); taskResp = await Api.instance.post('/druid/indexer/v1/task', spec);
@ -3286,26 +3307,6 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
setTimeout(() => { setTimeout(() => {
goToIngestion(taskResp.data.task); goToIngestion(taskResp.data.task);
}, 1000); }, 1000);
} else {
try {
await Api.instance.post('/druid/indexer/v1/supervisor', spec);
} catch (e) {
AppToaster.show({
message: `Failed to submit supervisor: ${getDruidErrorMessage(e)}`,
intent: Intent.DANGER,
});
this.setState({ submitting: false });
return;
}
AppToaster.show({
message: 'Supervisor submitted successfully. Going to task view...',
intent: Intent.SUCCESS,
});
setTimeout(() => {
goToIngestion(undefined); // Can we get the supervisor ID here?
}, 1000);
} }
}; };
} }