Web console: Fixed sampling for delta source in classic data loader and MSQ (#17160) (#17199)

This commit is contained in:
Abhishek Radhakrishnan 2024-09-30 10:25:38 -07:00 committed by GitHub
parent 7638d29c40
commit c6f41dcd22
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 152 additions and 137 deletions

View File

@ -56,7 +56,7 @@ function _build_distribution() {
(
# Add HEAD as an allowed HTTP method since this is how we check when the Druid service is ready.
cd "$(_get_code_root)" \
&& mvn -Pdist,skip-static-checks,skip-tests -Dmaven.javadoc.skip=true -q -T1C install \
&& mvn -Pdist,bundle-contrib-exts,skip-static-checks,skip-tests -Dforbiddenapis.skip=true -Dcheckstyle.skip=true -Dpmd.skip=true -Dmaven.javadoc.skip=true -Danimal.sniffer.skip=true -Denforcer.skip=true -Dcyclonedx.skip=true -q -T1C install \
&& cd distribution/target \
&& tar xzf "apache-druid-$(_get_druid_version)-bin.tar.gz" \
&& cd apache-druid-$(_get_druid_version) \
@ -64,7 +64,7 @@ function _build_distribution() {
&& cp "$(_get_code_root)/extensions-core/testing-tools/target/druid-testing-tools-$(_get_druid_version).jar" extensions/druid-testing-tools/ \
&& mkdir -p extensions/druid-compressed-bigdecimal \
&& cp "$(_get_code_root)/extensions-contrib/compressed-bigdecimal/target/druid-compressed-bigdecimal-$(_get_druid_version).jar" extensions/druid-compressed-bigdecimal/ \
&& echo -e "\n\ndruid.extensions.loadList=[\"druid-hdfs-storage\", \"druid-kafka-indexing-service\", \"druid-multi-stage-query\", \"druid-testing-tools\", \"druid-bloom-filter\", \"druid-datasketches\", \"druid-histogram\", \"druid-stats\", \"druid-compressed-bigdecimal\"]" >> conf/druid/auto/_common/common.runtime.properties \
&& echo -e "\n\ndruid.extensions.loadList=[\"druid-hdfs-storage\", \"druid-kafka-indexing-service\", \"druid-multi-stage-query\", \"druid-testing-tools\", \"druid-bloom-filter\", \"druid-datasketches\", \"druid-histogram\", \"druid-stats\", \"druid-compressed-bigdecimal\", \"druid-parquet-extensions\", \"druid-deltalake-extensions\"]" >> conf/druid/auto/_common/common.runtime.properties \
&& echo -e "\n\ndruid.server.http.allowedHttpMethods=[\"HEAD\"]" >> conf/druid/auto/_common/common.runtime.properties \
&& echo -e "\n\ndruid.export.storage.baseDir=/" >> conf/druid/auto/_common/common.runtime.properties \
)

View File

@ -25,9 +25,8 @@ import AceEditor from 'react-ace';
import './json-input.scss';
function parseHjson(str: string) {
// Throwing on empty input is more consistent with how JSON.parse works
if (str.trim() === '') throw new Error('empty hjson');
function parseHjson(str: string): any {
if (str.trim() === '') return;
return Hjson.parse(str);
}

View File

@ -395,6 +395,10 @@ export function isDruidSource(spec: Partial<IngestionSpec>): boolean {
return deepGet(spec, 'spec.ioConfig.inputSource.type') === 'druid';
}
export function isFixedFormatSource(spec: Partial<IngestionSpec>): boolean {
return oneOf(deepGet(spec, 'spec.ioConfig.inputSource.type'), 'druid', 'delta');
}
export function getPossibleSystemFieldsForSpec(spec: Partial<IngestionSpec>): string[] {
const inputSource = deepGet(spec, 'spec.ioConfig.inputSource');
if (!inputSource) return [];
@ -1061,7 +1065,6 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F
label: 'Delta filter',
type: 'json',
placeholder: '{"type": "=", "column": "name", "value": "foo"}',
defaultValue: {},
info: (
<>
<ExternalLink
@ -1078,7 +1081,7 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F
label: 'Delta snapshot version',
type: 'number',
placeholder: '(latest)',
defaultValue: {},
zeroMeansUndefined: true,
info: (
<>
The snapshot version to read from the Delta table. By default, the latest snapshot is
@ -1613,6 +1616,9 @@ export function guessDataSourceNameFromInputSource(inputSource: InputSource): st
return actualPath ? actualPath.path : uriPath ? filenameFromPath(uriPath) : undefined;
}
case 'delta':
return inputSource.tablePath ? filenameFromPath(inputSource.tablePath) : undefined;
case 'http':
return Array.isArray(inputSource.uris) ? filenameFromPath(inputSource.uris[0]) : undefined;

View File

@ -653,7 +653,6 @@ export const INPUT_SOURCE_FIELDS: Field<InputSource>[] = [
type: 'json',
placeholder: '{"type": "=", "column": "name", "value": "foo"}',
defined: typeIsKnown(KNOWN_TYPES, 'delta'),
required: false,
info: (
<>
<ExternalLink href={`${getLink('DOCS')}/ingestion/input-sources/#delta-filter-object`}>
@ -668,8 +667,8 @@ export const INPUT_SOURCE_FIELDS: Field<InputSource>[] = [
label: 'Delta snapshot version',
type: 'number',
placeholder: '(latest)',
zeroMeansUndefined: true,
defined: typeIsKnown(KNOWN_TYPES, 'delta'),
required: false,
info: (
<>
The snapshot version to read from the Delta table. By default, the latest snapshot is read.

View File

@ -17,6 +17,7 @@
*/
import { dedupe, F, SqlExpression, SqlFunction } from '@druid-toolkit/query';
import type { CancelToken } from 'axios';
import * as JSONBig from 'json-bigint-native';
import type {
@ -40,6 +41,7 @@ import {
getSpecType,
getTimestampSchema,
isDruidSource,
isFixedFormatSource,
PLACEHOLDER_TIMESTAMP_SPEC,
REINDEX_TIMESTAMP_SPEC,
TIME_COLUMN,
@ -187,12 +189,15 @@ export async function getProxyOverlordModules(): Promise<string[]> {
export async function postToSampler(
sampleSpec: SampleSpec,
forStr: string,
cancelToken?: CancelToken,
): Promise<SampleResponse> {
sampleSpec = fixSamplerLookups(fixSamplerTypes(sampleSpec));
let sampleResp: any;
try {
sampleResp = await Api.instance.post(`/druid/indexer/v1/sampler?for=${forStr}`, sampleSpec);
sampleResp = await Api.instance.post(`/druid/indexer/v1/sampler?for=${forStr}`, sampleSpec, {
cancelToken,
});
} catch (e) {
throw new Error(getDruidErrorMessage(e));
}
@ -269,8 +274,7 @@ export async function sampleForConnect(
sampleStrategy,
);
const reingestMode = isDruidSource(spec);
if (!reingestMode) {
if (!isFixedFormatSource(spec)) {
ioConfig = deepSet(
ioConfig,
'inputFormat',
@ -282,6 +286,7 @@ export async function sampleForConnect(
);
}
const reingestMode = isDruidSource(spec);
const sampleSpec: SampleSpec = {
type: samplerType,
spec: {
@ -290,7 +295,7 @@ export async function sampleForConnect(
dataSchema: {
dataSource: 'sample',
timestampSpec: reingestMode ? REINDEX_TIMESTAMP_SPEC : PLACEHOLDER_TIMESTAMP_SPEC,
dimensionsSpec: {},
dimensionsSpec: { useSchemaDiscovery: true },
granularitySpec: {
rollup: false,
},

View File

@ -115,6 +115,7 @@ import {
invalidPartitionConfig,
isDruidSource,
isEmptyIngestionSpec,
isFixedFormatSource,
isKafkaOrKinesis,
isStreamingSpec,
issueWithIoConfig,
@ -265,26 +266,27 @@ function showBlankLine(line: SampleEntry): string {
function formatSampleEntries(
sampleEntries: SampleEntry[],
specialSource: undefined | 'druid' | 'kafka' | 'kinesis',
): string {
if (!sampleEntries.length) return 'No data returned from sampler';
specialSource: undefined | 'fixedFormat' | 'druid' | 'kafka' | 'kinesis',
): string[] {
if (!sampleEntries.length) return ['No data returned from sampler'];
switch (specialSource) {
case 'fixedFormat':
return sampleEntries.map(l => JSONBig.stringify(l.parsed));
case 'druid':
return sampleEntries.map(showDruidLine).join('\n');
return sampleEntries.map(showDruidLine);
case 'kafka':
return sampleEntries.map(showKafkaLine).join('\n');
return sampleEntries.map(showKafkaLine);
case 'kinesis':
return sampleEntries.map(showKinesisLine).join('\n');
return sampleEntries.map(showKinesisLine);
default:
return (
sampleEntries.every(l => !l.parsed)
return sampleEntries.every(l => !l.parsed)
? sampleEntries.map(showBlankLine)
: sampleEntries.map(showRawLine)
).join('\n');
: sampleEntries.map(showRawLine);
}
}
@ -551,7 +553,6 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
isStepEnabled(step: Step): boolean {
const { spec, cacheRows } = this.state;
const druidSource = isDruidSource(spec);
const ioConfig: IoConfig = deepGet(spec, 'spec.ioConfig') || EMPTY_OBJECT;
switch (step) {
@ -559,10 +560,12 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
return Boolean(spec.type);
case 'parser':
return Boolean(!druidSource && spec.type && !issueWithIoConfig(ioConfig));
return Boolean(!isFixedFormatSource(spec) && spec.type && !issueWithIoConfig(ioConfig));
case 'timestamp':
return Boolean(!druidSource && cacheRows && deepGet(spec, 'spec.dataSchema.timestampSpec'));
return Boolean(
!isDruidSource(spec) && cacheRows && deepGet(spec, 'spec.dataSchema.timestampSpec'),
);
case 'transform':
case 'filter':
@ -1256,7 +1259,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
const deltaState: Partial<LoadDataViewState> = {
inputQueryState: new QueryState({ data: sampleResponse }),
};
if (isDruidSource(spec)) {
if (isFixedFormatSource(spec)) {
deltaState.cacheRows = getCacheRowsFromSampleResponse(sampleResponse);
}
this.setState(deltaState as LoadDataViewState);
@ -1268,8 +1271,15 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
const specType = getSpecType(spec);
const ioConfig: IoConfig = deepGet(spec, 'spec.ioConfig') || EMPTY_OBJECT;
const inlineMode = deepGet(spec, 'spec.ioConfig.inputSource.type') === 'inline';
const fixedFormatSource = isFixedFormatSource(spec);
const druidSource = isDruidSource(spec);
const specialSource = druidSource ? 'druid' : isKafkaOrKinesis(specType) ? specType : undefined;
const specialSource = druidSource
? 'druid'
: fixedFormatSource
? 'fixedFormat'
: isKafkaOrKinesis(specType)
? specType
: undefined;
let mainFill: JSX.Element | string;
if (inlineMode) {
@ -1301,7 +1311,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
<TextArea
className="raw-lines"
readOnly
value={formatSampleEntries(inputData, specialSource)}
value={formatSampleEntries(inputData, specialSource).join('\n')}
/>
)}
{inputQueryState.isLoading() && <Loader />}
@ -1373,7 +1383,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
</div>
{this.renderNextBar({
disabled: !inputQueryState.data,
nextStep: druidSource ? 'transform' : 'parser',
nextStep: druidSource ? 'transform' : fixedFormatSource ? 'timestamp' : 'parser',
onNextStep: () => {
if (!inputQueryState.data) return false;
const inputData = inputQueryState.data;
@ -1421,6 +1431,15 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
}
}
this.updateSpec(fillDataSourceNameIfNeeded(newSpec));
}
if (fixedFormatSource) {
const newSpec = deepSet(
spec,
'spec.dataSchema.timestampSpec',
getTimestampSpec(inputQueryState.data),
);
this.updateSpec(fillDataSourceNameIfNeeded(newSpec));
} else {
const issue = issueWithSampleData(
@ -1673,21 +1692,15 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
disabled: !parserQueryState.data,
onNextStep: () => {
if (!parserQueryState.data) return false;
let possibleTimestampSpec: TimestampSpec;
if (isDruidSource(spec)) {
possibleTimestampSpec = {
const possibleTimestampSpec = isDruidSource(spec)
? {
column: TIME_COLUMN,
format: 'auto',
};
} else {
possibleTimestampSpec = getTimestampSpec(parserQueryState.data);
}
: getTimestampSpec(parserQueryState.data);
if (possibleTimestampSpec) {
const newSpec = deepSet(spec, 'spec.dataSchema.timestampSpec', possibleTimestampSpec);
this.updateSpec(newSpec);
}
return true;
},
})}

View File

@ -256,7 +256,6 @@ export const SqlDataLoaderView = React.memo(function SqlDataLoaderView(
<TitleFrame title="Load data" subtitle="Select input type">
<InputSourceStep
initInputSource={inputSource}
mode="sampler"
onSet={(inputSource, inputFormat) => {
setExternalConfigStep({ inputSource, inputFormat });
}}

View File

@ -83,7 +83,6 @@ export const ConnectExternalDataDialog = React.memo(function ConnectExternalData
) : (
<InputSourceStep
initInputSource={inputSource}
mode="sampler"
onSet={(inputSource, inputFormat, partitionedByHint) => {
setExternalConfigStep({ inputSource, inputFormat, partitionedByHint });
}}

View File

@ -98,7 +98,9 @@ export const InputFormatStep = React.memo(function InputFormatStep(props: InputF
const [previewState] = useQueryManager<InputSourceAndFormat, SampleResponse>({
query: inputSourceAndFormatToSample,
processQuery: async ({ inputSource, inputFormat }) => {
if (!isValidInputFormat(inputFormat)) throw new Error('invalid input format');
const fixedFormatSource = inputSource.type === 'delta';
if (!fixedFormatSource && !isValidInputFormat(inputFormat))
throw new Error('invalid input format');
const sampleSpec: SampleSpec = {
type: 'index_parallel',
@ -106,7 +108,9 @@ export const InputFormatStep = React.memo(function InputFormatStep(props: InputF
ioConfig: {
type: 'index_parallel',
inputSource,
inputFormat: deepSet(inputFormat, 'keepNullColumns', true),
inputFormat: fixedFormatSource
? undefined
: (deepSet(inputFormat, 'keepNullColumns', true) as InputFormat),
},
dataSchema: {
dataSource: 'sample',
@ -196,6 +200,8 @@ export const InputFormatStep = React.memo(function InputFormatStep(props: InputF
const needsResample = inputSourceAndFormatToSample !== inputSourceAndFormat;
const nextDisabled = !inputSourceFormatAndMore || needsResample;
const fixedFormatSource = inputSourceFormatAndMore?.inputSource.type === 'delta';
return (
<div className="input-format-step">
<div className="preview">
@ -227,6 +233,15 @@ export const InputFormatStep = React.memo(function InputFormatStep(props: InputF
<LearnMore href={`${getLink('DOCS')}/ingestion/data-formats`} />
</Callout>
</FormGroup>
{fixedFormatSource ? (
<FormGroup>
<Callout>
The <Tag minimal>{inputSourceFormatAndMore?.inputSource.type}</Tag> input source has
a fixed format that can not be configured.
</Callout>
</FormGroup>
) : (
<>
<AutoForm
fields={BATCH_INPUT_FORMAT_FIELDS}
model={inputSourceAndFormat.inputFormat}
@ -249,6 +264,8 @@ export const InputFormatStep = React.memo(function InputFormatStep(props: InputF
onChange={setInputSourceAndFormat as any}
/>
)}
</>
)}
{needsResample && (
<FormGroup className="control-buttons">
<Button

View File

@ -28,7 +28,6 @@ import {
} from '@blueprintjs/core';
import { IconNames } from '@blueprintjs/icons';
import type { QueryResult } from '@druid-toolkit/query';
import { SqlColumnDeclaration } from '@druid-toolkit/query';
import classNames from 'classnames';
import type { JSX } from 'react';
import React, { useEffect, useState } from 'react';
@ -37,7 +36,6 @@ import { AutoForm, ExternalLink } from '../../../components';
import { ShowValueDialog } from '../../../dialogs/show-value-dialog/show-value-dialog';
import type { Execution, ExecutionError, InputFormat, InputSource } from '../../../druid-models';
import {
externalConfigToTableExpression,
getIngestionImage,
getIngestionTitle,
guessSimpleInputFormat,
@ -45,11 +43,7 @@ import {
issueWithSampleData,
PLACEHOLDER_TIMESTAMP_SPEC,
} from '../../../druid-models';
import {
executionBackgroundResultStatusCheck,
extractResult,
submitTaskQuery,
} from '../../../helpers';
import { executionBackgroundResultStatusCheck } from '../../../helpers';
import { useQueryManager } from '../../../hooks';
import { AppToaster, UrlBaser } from '../../../singletons';
import { filterMap, IntermediateQueryState } from '../../../utils';
@ -61,17 +55,20 @@ import { InputSourceInfo } from './input-source-info';
import './input-source-step.scss';
const BOGUS_LIST_DELIMITER = '56616469-6de2-9da4-efb8-8f416e6e6965'; // Just a UUID to disable the list delimiter, let's hope we do not see this UUID in the data
const ROWS_TO_SAMPLE = 50;
const FIXED_FORMAT_FOR_SOURCE: Record<string, InputFormat> = {
delta: { type: 'parquet' },
};
function resultToInputFormat(result: QueryResult): InputFormat {
if (!result.rows.length) throw new Error('No data returned from sample query');
return guessSimpleInputFormat(result.rows.map((r: any) => r[0]));
}
const BOGUS_LIST_DELIMITER = '56616469-6de2-9da4-efb8-8f416e6e6965'; // Just a UUID to disable the list delimiter, let's hope we do not see this UUID in the data
const ROWS_TO_SAMPLE = 50;
export interface InputSourceStepProps {
initInputSource: Partial<InputSource> | undefined;
mode: 'sampler' | 'msq';
onSet(
inputSource: InputSource,
inputFormat: InputFormat,
@ -80,7 +77,7 @@ export interface InputSourceStepProps {
}
export const InputSourceStep = React.memo(function InputSourceStep(props: InputSourceStepProps) {
const { initInputSource, mode, onSet } = props;
const { initInputSource, onSet } = props;
const [stackToShow, setStackToShow] = useState<string | undefined>();
const [inputSource, setInputSource] = useState<Partial<InputSource> | string | undefined>(
@ -94,15 +91,17 @@ export const InputSourceStep = React.memo(function InputSourceStep(props: InputS
Execution
>({
processQuery: async ({ inputSource, suggestedInputFormat }, cancelToken) => {
let sampleLines: string[];
if (mode === 'sampler') {
const fixedFormat = FIXED_FORMAT_FOR_SOURCE['delta'];
const sampleSpec: SampleSpec = {
type: 'index_parallel',
spec: {
ioConfig: {
type: 'index_parallel',
inputSource,
inputFormat: {
inputFormat: fixedFormat
? undefined
: {
type: 'regex',
pattern: '([\\s\\S]*)', // Match the entire line, every single character
listDelimiter: BOGUS_LIST_DELIMITER,
@ -124,36 +123,14 @@ export const InputSourceStep = React.memo(function InputSourceStep(props: InputS
},
};
const sampleResponse = await postToSampler(sampleSpec, 'input-source-step');
sampleLines = filterMap(sampleResponse.data, l => (l.input ? l.input.raw : undefined));
} else {
const tableExpression = externalConfigToTableExpression({
inputSource,
inputFormat: {
type: 'regex',
pattern: '([\\s\\S]*)',
listDelimiter: BOGUS_LIST_DELIMITER,
columns: ['raw'],
},
signature: [SqlColumnDeclaration.create('raw', 'VARCHAR')],
});
const result = extractResult(
await submitTaskQuery({
query: `SELECT REPLACE(raw, U&'\\0000', '') AS "raw" FROM ${tableExpression}`, // Make sure to remove possible \u0000 chars as they are not allowed and will produce an InvalidNullByte error message
context: {
sqlOuterLimit: ROWS_TO_SAMPLE,
},
cancelToken,
}),
const sampleResponse = await postToSampler(sampleSpec, 'input-source-step', cancelToken);
const sampleLines = filterMap(
sampleResponse.data,
fixedFormat ? l => l.input : l => (l.input ? l.input.raw : undefined),
);
if (result instanceof IntermediateQueryState) return result;
sampleLines = result.rows.map((r: string[]) => r[0]);
}
if (!sampleLines.length) throw new Error('No data returned from sampler');
if (fixedFormat) return fixedFormat;
const issue = issueWithSampleData(sampleLines, false);
if (issue) {
@ -226,6 +203,7 @@ export const InputSourceStep = React.memo(function InputSourceStep(props: InputS
{renderIngestionCard('s3')}
{renderIngestionCard('azureStorage')}
{renderIngestionCard('google')}
{renderIngestionCard('delta')}
{renderIngestionCard('hdfs')}
{renderIngestionCard('http')}
{renderIngestionCard('local')}