diff --git a/web-console/script/druid b/web-console/script/druid index 7b3163bf9d6..122febaf049 100755 --- a/web-console/script/druid +++ b/web-console/script/druid @@ -56,7 +56,7 @@ function _build_distribution() { ( # Add HEAD as an allowed HTTP method since this is how we check when the Druid service is ready. cd "$(_get_code_root)" \ - && mvn -Pdist,skip-static-checks,skip-tests -Dmaven.javadoc.skip=true -q -T1C install \ + && mvn -Pdist,bundle-contrib-exts,skip-static-checks,skip-tests -Dforbiddenapis.skip=true -Dcheckstyle.skip=true -Dpmd.skip=true -Dmaven.javadoc.skip=true -Danimal.sniffer.skip=true -Denforcer.skip=true -Dcyclonedx.skip=true -q -T1C install \ && cd distribution/target \ && tar xzf "apache-druid-$(_get_druid_version)-bin.tar.gz" \ && cd apache-druid-$(_get_druid_version) \ @@ -64,7 +64,7 @@ function _build_distribution() { && cp "$(_get_code_root)/extensions-core/testing-tools/target/druid-testing-tools-$(_get_druid_version).jar" extensions/druid-testing-tools/ \ && mkdir -p extensions/druid-compressed-bigdecimal \ && cp "$(_get_code_root)/extensions-contrib/compressed-bigdecimal/target/druid-compressed-bigdecimal-$(_get_druid_version).jar" extensions/druid-compressed-bigdecimal/ \ - && echo -e "\n\ndruid.extensions.loadList=[\"druid-hdfs-storage\", \"druid-kafka-indexing-service\", \"druid-multi-stage-query\", \"druid-testing-tools\", \"druid-bloom-filter\", \"druid-datasketches\", \"druid-histogram\", \"druid-stats\", \"druid-compressed-bigdecimal\"]" >> conf/druid/auto/_common/common.runtime.properties \ + && echo -e "\n\ndruid.extensions.loadList=[\"druid-hdfs-storage\", \"druid-kafka-indexing-service\", \"druid-multi-stage-query\", \"druid-testing-tools\", \"druid-bloom-filter\", \"druid-datasketches\", \"druid-histogram\", \"druid-stats\", \"druid-compressed-bigdecimal\", \"druid-parquet-extensions\", \"druid-deltalake-extensions\"]" >> conf/druid/auto/_common/common.runtime.properties \ && echo -e "\n\ndruid.server.http.allowedHttpMethods=[\"HEAD\"]" >> conf/druid/auto/_common/common.runtime.properties \ && echo -e "\n\ndruid.export.storage.baseDir=/" >> conf/druid/auto/_common/common.runtime.properties \ ) diff --git a/web-console/src/components/json-input/json-input.tsx b/web-console/src/components/json-input/json-input.tsx index f4ffddc1114..578cab3ebcc 100644 --- a/web-console/src/components/json-input/json-input.tsx +++ b/web-console/src/components/json-input/json-input.tsx @@ -25,9 +25,8 @@ import AceEditor from 'react-ace'; import './json-input.scss'; -function parseHjson(str: string) { - // Throwing on empty input is more consistent with how JSON.parse works - if (str.trim() === '') throw new Error('empty hjson'); +function parseHjson(str: string): any { + if (str.trim() === '') return; return Hjson.parse(str); } diff --git a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx index 65a7ed3b6c4..46d6a3a4cbb 100644 --- a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx +++ b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx @@ -395,6 +395,10 @@ export function isDruidSource(spec: Partial): boolean { return deepGet(spec, 'spec.ioConfig.inputSource.type') === 'druid'; } +export function isFixedFormatSource(spec: Partial): boolean { + return oneOf(deepGet(spec, 'spec.ioConfig.inputSource.type'), 'druid', 'delta'); +} + export function getPossibleSystemFieldsForSpec(spec: Partial): string[] { const inputSource = deepGet(spec, 'spec.ioConfig.inputSource'); if (!inputSource) return []; @@ -1061,7 +1065,6 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F label: 'Delta filter', type: 'json', placeholder: '{"type": "=", "column": "name", "value": "foo"}', - defaultValue: {}, info: ( <> The snapshot version to read from the Delta table. By default, the latest snapshot is @@ -1613,6 +1616,9 @@ export function guessDataSourceNameFromInputSource(inputSource: InputSource): st return actualPath ? actualPath.path : uriPath ? filenameFromPath(uriPath) : undefined; } + case 'delta': + return inputSource.tablePath ? filenameFromPath(inputSource.tablePath) : undefined; + case 'http': return Array.isArray(inputSource.uris) ? filenameFromPath(inputSource.uris[0]) : undefined; diff --git a/web-console/src/druid-models/input-source/input-source.tsx b/web-console/src/druid-models/input-source/input-source.tsx index eb19e4575db..6ad3b8e0a1a 100644 --- a/web-console/src/druid-models/input-source/input-source.tsx +++ b/web-console/src/druid-models/input-source/input-source.tsx @@ -653,7 +653,6 @@ export const INPUT_SOURCE_FIELDS: Field[] = [ type: 'json', placeholder: '{"type": "=", "column": "name", "value": "foo"}', defined: typeIsKnown(KNOWN_TYPES, 'delta'), - required: false, info: ( <> @@ -668,8 +667,8 @@ export const INPUT_SOURCE_FIELDS: Field[] = [ label: 'Delta snapshot version', type: 'number', placeholder: '(latest)', + zeroMeansUndefined: true, defined: typeIsKnown(KNOWN_TYPES, 'delta'), - required: false, info: ( <> The snapshot version to read from the Delta table. By default, the latest snapshot is read. diff --git a/web-console/src/utils/sampler.ts b/web-console/src/utils/sampler.ts index f74acf024c9..7fd3d67364c 100644 --- a/web-console/src/utils/sampler.ts +++ b/web-console/src/utils/sampler.ts @@ -17,6 +17,7 @@ */ import { dedupe, F, SqlExpression, SqlFunction } from '@druid-toolkit/query'; +import type { CancelToken } from 'axios'; import * as JSONBig from 'json-bigint-native'; import type { @@ -40,6 +41,7 @@ import { getSpecType, getTimestampSchema, isDruidSource, + isFixedFormatSource, PLACEHOLDER_TIMESTAMP_SPEC, REINDEX_TIMESTAMP_SPEC, TIME_COLUMN, @@ -187,12 +189,15 @@ export async function getProxyOverlordModules(): Promise { export async function postToSampler( sampleSpec: SampleSpec, forStr: string, + cancelToken?: CancelToken, ): Promise { sampleSpec = fixSamplerLookups(fixSamplerTypes(sampleSpec)); let sampleResp: any; try { - sampleResp = await Api.instance.post(`/druid/indexer/v1/sampler?for=${forStr}`, sampleSpec); + sampleResp = await Api.instance.post(`/druid/indexer/v1/sampler?for=${forStr}`, sampleSpec, { + cancelToken, + }); } catch (e) { throw new Error(getDruidErrorMessage(e)); } @@ -269,8 +274,7 @@ export async function sampleForConnect( sampleStrategy, ); - const reingestMode = isDruidSource(spec); - if (!reingestMode) { + if (!isFixedFormatSource(spec)) { ioConfig = deepSet( ioConfig, 'inputFormat', @@ -282,6 +286,7 @@ export async function sampleForConnect( ); } + const reingestMode = isDruidSource(spec); const sampleSpec: SampleSpec = { type: samplerType, spec: { @@ -290,7 +295,7 @@ export async function sampleForConnect( dataSchema: { dataSource: 'sample', timestampSpec: reingestMode ? REINDEX_TIMESTAMP_SPEC : PLACEHOLDER_TIMESTAMP_SPEC, - dimensionsSpec: {}, + dimensionsSpec: { useSchemaDiscovery: true }, granularitySpec: { rollup: false, }, diff --git a/web-console/src/views/load-data-view/load-data-view.tsx b/web-console/src/views/load-data-view/load-data-view.tsx index 42cfa25a7b7..bcb79d639f2 100644 --- a/web-console/src/views/load-data-view/load-data-view.tsx +++ b/web-console/src/views/load-data-view/load-data-view.tsx @@ -115,6 +115,7 @@ import { invalidPartitionConfig, isDruidSource, isEmptyIngestionSpec, + isFixedFormatSource, isKafkaOrKinesis, isStreamingSpec, issueWithIoConfig, @@ -265,26 +266,27 @@ function showBlankLine(line: SampleEntry): string { function formatSampleEntries( sampleEntries: SampleEntry[], - specialSource: undefined | 'druid' | 'kafka' | 'kinesis', -): string { - if (!sampleEntries.length) return 'No data returned from sampler'; + specialSource: undefined | 'fixedFormat' | 'druid' | 'kafka' | 'kinesis', +): string[] { + if (!sampleEntries.length) return ['No data returned from sampler']; switch (specialSource) { + case 'fixedFormat': + return sampleEntries.map(l => JSONBig.stringify(l.parsed)); + case 'druid': - return sampleEntries.map(showDruidLine).join('\n'); + return sampleEntries.map(showDruidLine); case 'kafka': - return sampleEntries.map(showKafkaLine).join('\n'); + return sampleEntries.map(showKafkaLine); case 'kinesis': - return sampleEntries.map(showKinesisLine).join('\n'); + return sampleEntries.map(showKinesisLine); default: - return ( - sampleEntries.every(l => !l.parsed) - ? sampleEntries.map(showBlankLine) - : sampleEntries.map(showRawLine) - ).join('\n'); + return sampleEntries.every(l => !l.parsed) + ? sampleEntries.map(showBlankLine) + : sampleEntries.map(showRawLine); } } @@ -551,7 +553,6 @@ export class LoadDataView extends React.PureComponent = { inputQueryState: new QueryState({ data: sampleResponse }), }; - if (isDruidSource(spec)) { + if (isFixedFormatSource(spec)) { deltaState.cacheRows = getCacheRowsFromSampleResponse(sampleResponse); } this.setState(deltaState as LoadDataViewState); @@ -1268,8 +1271,15 @@ export class LoadDataView extends React.PureComponent )} {inputQueryState.isLoading() && } @@ -1373,7 +1383,7 @@ export class LoadDataView extends React.PureComponent {this.renderNextBar({ disabled: !inputQueryState.data, - nextStep: druidSource ? 'transform' : 'parser', + nextStep: druidSource ? 'transform' : fixedFormatSource ? 'timestamp' : 'parser', onNextStep: () => { if (!inputQueryState.data) return false; const inputData = inputQueryState.data; @@ -1421,6 +1431,15 @@ export class LoadDataView extends React.PureComponent { if (!parserQueryState.data) return false; - let possibleTimestampSpec: TimestampSpec; - if (isDruidSource(spec)) { - possibleTimestampSpec = { - column: TIME_COLUMN, - format: 'auto', - }; - } else { - possibleTimestampSpec = getTimestampSpec(parserQueryState.data); - } - - if (possibleTimestampSpec) { - const newSpec = deepSet(spec, 'spec.dataSchema.timestampSpec', possibleTimestampSpec); - this.updateSpec(newSpec); - } + const possibleTimestampSpec = isDruidSource(spec) + ? { + column: TIME_COLUMN, + format: 'auto', + } + : getTimestampSpec(parserQueryState.data); + const newSpec = deepSet(spec, 'spec.dataSchema.timestampSpec', possibleTimestampSpec); + this.updateSpec(newSpec); return true; }, })} diff --git a/web-console/src/views/sql-data-loader-view/sql-data-loader-view.tsx b/web-console/src/views/sql-data-loader-view/sql-data-loader-view.tsx index a7d6adac71e..8238384d9ae 100644 --- a/web-console/src/views/sql-data-loader-view/sql-data-loader-view.tsx +++ b/web-console/src/views/sql-data-loader-view/sql-data-loader-view.tsx @@ -256,7 +256,6 @@ export const SqlDataLoaderView = React.memo(function SqlDataLoaderView( { setExternalConfigStep({ inputSource, inputFormat }); }} diff --git a/web-console/src/views/workbench-view/connect-external-data-dialog/connect-external-data-dialog.tsx b/web-console/src/views/workbench-view/connect-external-data-dialog/connect-external-data-dialog.tsx index 4302c3783b2..124b4441194 100644 --- a/web-console/src/views/workbench-view/connect-external-data-dialog/connect-external-data-dialog.tsx +++ b/web-console/src/views/workbench-view/connect-external-data-dialog/connect-external-data-dialog.tsx @@ -83,7 +83,6 @@ export const ConnectExternalDataDialog = React.memo(function ConnectExternalData ) : ( { setExternalConfigStep({ inputSource, inputFormat, partitionedByHint }); }} diff --git a/web-console/src/views/workbench-view/input-format-step/input-format-step.tsx b/web-console/src/views/workbench-view/input-format-step/input-format-step.tsx index 3586170d63d..2f054a1543e 100644 --- a/web-console/src/views/workbench-view/input-format-step/input-format-step.tsx +++ b/web-console/src/views/workbench-view/input-format-step/input-format-step.tsx @@ -98,7 +98,9 @@ export const InputFormatStep = React.memo(function InputFormatStep(props: InputF const [previewState] = useQueryManager({ query: inputSourceAndFormatToSample, processQuery: async ({ inputSource, inputFormat }) => { - if (!isValidInputFormat(inputFormat)) throw new Error('invalid input format'); + const fixedFormatSource = inputSource.type === 'delta'; + if (!fixedFormatSource && !isValidInputFormat(inputFormat)) + throw new Error('invalid input format'); const sampleSpec: SampleSpec = { type: 'index_parallel', @@ -106,7 +108,9 @@ export const InputFormatStep = React.memo(function InputFormatStep(props: InputF ioConfig: { type: 'index_parallel', inputSource, - inputFormat: deepSet(inputFormat, 'keepNullColumns', true), + inputFormat: fixedFormatSource + ? undefined + : (deepSet(inputFormat, 'keepNullColumns', true) as InputFormat), }, dataSchema: { dataSource: 'sample', @@ -196,6 +200,8 @@ export const InputFormatStep = React.memo(function InputFormatStep(props: InputF const needsResample = inputSourceAndFormatToSample !== inputSourceAndFormat; const nextDisabled = !inputSourceFormatAndMore || needsResample; + const fixedFormatSource = inputSourceFormatAndMore?.inputSource.type === 'delta'; + return (
@@ -227,27 +233,38 @@ export const InputFormatStep = React.memo(function InputFormatStep(props: InputF - - setInputSourceAndFormat({ ...inputSourceAndFormat, inputFormat }) - } - /> - {possibleSystemFields.length > 0 && ( - + {fixedFormatSource ? ( + + + The {inputSourceFormatAndMore?.inputSource.type} input source has + a fixed format that can not be configured. + + + ) : ( + <> + + setInputSourceAndFormat({ ...inputSourceAndFormat, inputFormat }) + } + /> + {possibleSystemFields.length > 0 && ( + + )} + )} {needsResample && ( diff --git a/web-console/src/views/workbench-view/input-source-step/input-source-step.tsx b/web-console/src/views/workbench-view/input-source-step/input-source-step.tsx index 6d483cf4410..8beed74fa27 100644 --- a/web-console/src/views/workbench-view/input-source-step/input-source-step.tsx +++ b/web-console/src/views/workbench-view/input-source-step/input-source-step.tsx @@ -28,7 +28,6 @@ import { } from '@blueprintjs/core'; import { IconNames } from '@blueprintjs/icons'; import type { QueryResult } from '@druid-toolkit/query'; -import { SqlColumnDeclaration } from '@druid-toolkit/query'; import classNames from 'classnames'; import type { JSX } from 'react'; import React, { useEffect, useState } from 'react'; @@ -37,7 +36,6 @@ import { AutoForm, ExternalLink } from '../../../components'; import { ShowValueDialog } from '../../../dialogs/show-value-dialog/show-value-dialog'; import type { Execution, ExecutionError, InputFormat, InputSource } from '../../../druid-models'; import { - externalConfigToTableExpression, getIngestionImage, getIngestionTitle, guessSimpleInputFormat, @@ -45,11 +43,7 @@ import { issueWithSampleData, PLACEHOLDER_TIMESTAMP_SPEC, } from '../../../druid-models'; -import { - executionBackgroundResultStatusCheck, - extractResult, - submitTaskQuery, -} from '../../../helpers'; +import { executionBackgroundResultStatusCheck } from '../../../helpers'; import { useQueryManager } from '../../../hooks'; import { AppToaster, UrlBaser } from '../../../singletons'; import { filterMap, IntermediateQueryState } from '../../../utils'; @@ -61,17 +55,20 @@ import { InputSourceInfo } from './input-source-info'; import './input-source-step.scss'; +const BOGUS_LIST_DELIMITER = '56616469-6de2-9da4-efb8-8f416e6e6965'; // Just a UUID to disable the list delimiter, let's hope we do not see this UUID in the data +const ROWS_TO_SAMPLE = 50; + +const FIXED_FORMAT_FOR_SOURCE: Record = { + delta: { type: 'parquet' }, +}; + function resultToInputFormat(result: QueryResult): InputFormat { if (!result.rows.length) throw new Error('No data returned from sample query'); return guessSimpleInputFormat(result.rows.map((r: any) => r[0])); } -const BOGUS_LIST_DELIMITER = '56616469-6de2-9da4-efb8-8f416e6e6965'; // Just a UUID to disable the list delimiter, let's hope we do not see this UUID in the data -const ROWS_TO_SAMPLE = 50; - export interface InputSourceStepProps { initInputSource: Partial | undefined; - mode: 'sampler' | 'msq'; onSet( inputSource: InputSource, inputFormat: InputFormat, @@ -80,7 +77,7 @@ export interface InputSourceStepProps { } export const InputSourceStep = React.memo(function InputSourceStep(props: InputSourceStepProps) { - const { initInputSource, mode, onSet } = props; + const { initInputSource, onSet } = props; const [stackToShow, setStackToShow] = useState(); const [inputSource, setInputSource] = useState | string | undefined>( @@ -94,66 +91,46 @@ export const InputSourceStep = React.memo(function InputSourceStep(props: InputS Execution >({ processQuery: async ({ inputSource, suggestedInputFormat }, cancelToken) => { - let sampleLines: string[]; - if (mode === 'sampler') { - const sampleSpec: SampleSpec = { - type: 'index_parallel', - spec: { - ioConfig: { - type: 'index_parallel', - inputSource, - inputFormat: { - type: 'regex', - pattern: '([\\s\\S]*)', // Match the entire line, every single character - listDelimiter: BOGUS_LIST_DELIMITER, - columns: ['raw'], - }, - }, - dataSchema: { - dataSource: 'sample', - timestampSpec: PLACEHOLDER_TIMESTAMP_SPEC, - dimensionsSpec: {}, - granularitySpec: { - rollup: false, - }, + const fixedFormat = FIXED_FORMAT_FOR_SOURCE['delta']; + + const sampleSpec: SampleSpec = { + type: 'index_parallel', + spec: { + ioConfig: { + type: 'index_parallel', + inputSource, + inputFormat: fixedFormat + ? undefined + : { + type: 'regex', + pattern: '([\\s\\S]*)', // Match the entire line, every single character + listDelimiter: BOGUS_LIST_DELIMITER, + columns: ['raw'], + }, + }, + dataSchema: { + dataSource: 'sample', + timestampSpec: PLACEHOLDER_TIMESTAMP_SPEC, + dimensionsSpec: {}, + granularitySpec: { + rollup: false, }, }, - samplerConfig: { - numRows: ROWS_TO_SAMPLE, - timeoutMs: 15000, - }, - }; + }, + samplerConfig: { + numRows: ROWS_TO_SAMPLE, + timeoutMs: 15000, + }, + }; - const sampleResponse = await postToSampler(sampleSpec, 'input-source-step'); - - sampleLines = filterMap(sampleResponse.data, l => (l.input ? l.input.raw : undefined)); - } else { - const tableExpression = externalConfigToTableExpression({ - inputSource, - inputFormat: { - type: 'regex', - pattern: '([\\s\\S]*)', - listDelimiter: BOGUS_LIST_DELIMITER, - columns: ['raw'], - }, - signature: [SqlColumnDeclaration.create('raw', 'VARCHAR')], - }); - - const result = extractResult( - await submitTaskQuery({ - query: `SELECT REPLACE(raw, U&'\\0000', '') AS "raw" FROM ${tableExpression}`, // Make sure to remove possible \u0000 chars as they are not allowed and will produce an InvalidNullByte error message - context: { - sqlOuterLimit: ROWS_TO_SAMPLE, - }, - cancelToken, - }), - ); - - if (result instanceof IntermediateQueryState) return result; - sampleLines = result.rows.map((r: string[]) => r[0]); - } + const sampleResponse = await postToSampler(sampleSpec, 'input-source-step', cancelToken); + const sampleLines = filterMap( + sampleResponse.data, + fixedFormat ? l => l.input : l => (l.input ? l.input.raw : undefined), + ); if (!sampleLines.length) throw new Error('No data returned from sampler'); + if (fixedFormat) return fixedFormat; const issue = issueWithSampleData(sampleLines, false); if (issue) { @@ -226,6 +203,7 @@ export const InputSourceStep = React.memo(function InputSourceStep(props: InputS {renderIngestionCard('s3')} {renderIngestionCard('azureStorage')} {renderIngestionCard('google')} + {renderIngestionCard('delta')} {renderIngestionCard('hdfs')} {renderIngestionCard('http')} {renderIngestionCard('local')}