Web console: use new sampler features (#14017)

* use new sampler features

* supprot kafka format

* update DQT, fix tests

* prefer non numeric formats

* fix input format step

* boost SQL data loader

* delete dimension in auto discover mode

* inline example specs

* feedback updates

* yeet the format into valueFormat when switching to kafka

* kafka format is now a toggle

* even better form layout

* rename
This commit is contained in:
Vadim Ogievetsky 2023-04-07 06:28:29 -07:00 committed by GitHub
parent f41468fd46
commit 5ee4ecee62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
51 changed files with 15626 additions and 967 deletions

View File

@ -38,6 +38,7 @@ This topic covers how to submit a supervisor spec to ingest event data, also kno
- For a walk-through, see the [Loading from Apache Kafka](../../tutorials/tutorial-kafka.md) tutorial.
## Kafka support
The Kafka indexing service supports transactional topics introduced in Kafka 0.11.x by default. The consumer for Kafka indexing service is incompatible with older Kafka brokers. If you are using an older version, refer to the [Kafka upgrade guide](https://kafka.apache.org/documentation/#upgrade).
Additionally, you can set `isolation.level` to `read_uncommitted` in `consumerProperties` if either:
@ -51,6 +52,7 @@ If your Kafka cluster enables consumer-group based ACLs, you can set `group.id`
To use the Kafka indexing service, load the `druid-kafka-indexing-service` extension on both the Overlord and the MiddleManagers. See [Loading extensions](../extensions.md#loading-extensions) for instructions on how to configure extensions.
## Define a supervisor spec
Similar to the ingestion spec for batch ingestion, the supervisor spec configures the data ingestion for Kafka streaming ingestion. A supervisor spec has the following sections:
- `dataSchema` to specify the Druid datasource name, primary timestamp, dimensions, metrics, transforms, and any necessary filters.
- `ioConfig` to configure Kafka connection settings and configure how Druid parses the data. Kafka-specific connection details go in the `consumerProperties`. The `ioConfig` is also where you define the input format (`inputFormat`) of your Kafka data. For supported formats for Kafka and information on how to configure the input format, see [Data formats](../../ingestion/data-formats.md).
@ -128,6 +130,7 @@ The following example demonstrates a supervisor spec for Kafka that uses the `JS
```
### Kafka input format supervisor spec example
If you want to ingest data from other fields in addition to the Kafka message contents, you can use the `kafka` input format. The `kafka` input format lets you ingest:
- the event key field
- event headers
@ -141,7 +144,7 @@ For example, consider the following structure for a message that represents a fi
- **Event timestamp**: "Nov. 10, 2021 at 14:06"
When you use the `kafka` input format, you configure the way that Druid names the dimensions created from the Kafka message:
- `headerLabelPrefix`: Supply a prefix to the Kafka headers to avoid any conflicts with named dimensions. The default is `kafka.header`. Considering the header from the example, Druid maps the header to the following column: `kafka.header.environment`.
- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any conflicts with named dimensions. The default is `kafka.header`. Considering the header from the example, Druid maps the header to the following column: `kafka.header.environment`.
- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the Druid schema to avoid conflicts with other time columns. The default is `kafka.timestamp`.
- `keyColumnName`: Supply the name for the Kafka key column in Druid. The default is `kafka.key`.
Additionally, you must provide information about how Druid should parse the data in the Kafka message:
@ -159,7 +162,7 @@ Additionally, you must provide information about how Druid should parse the data
For more information on data formats, see [Data formats](../../ingestion/data-formats.md).
Finally, add the Kafka message columns to the `dimensionsSpec`. For the key and timestamp, you can use the dimension names you defined for `keyColumnName` and `timestampColumnName`. For header dimensions, append the header key to the `headerLabelPrefix`. For example `kafka.header.environment`.
Finally, add the Kafka message columns to the `dimensionsSpec`. For the key and timestamp, you can use the dimension names you defined for `keyColumnName` and `timestampColumnName`. For header dimensions, append the header key to the `headerColumnPrefix`. For example `kafka.header.environment`.
The following supervisor spec demonstrates how to ingest the Kafka header, key, and timestamp into Druid dimensions:
```
@ -174,7 +177,7 @@ The following supervisor spec demonstrates how to ingest the Kafka header, key,
"topic": "wiki-edits",
"inputFormat": {
"type": "kafka",
"headerLabelPrefix": "kafka.header.",
"headerColumnPrefix": "kafka.header.",
"timestampColumnName": "kafka.timestamp",
"keyColumnName": "kafka.key",
"headerFormat": {

View File

@ -170,7 +170,7 @@ Configure the Kafka `inputFormat` to load complete kafka records including heade
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| `type` | String | Set value to `kafka`. | yes |
| `headerLabelPrefix` | String | Custom label prefix for all the header columns. | no (default = "kafka.header.") |
| `headerColumnPrefix` | String | Custom prefix for all the header columns. | no (default = "kafka.header.") |
| `timestampColumnName` | String | Name of the column for the kafka record's timestamp.| no (default = "kafka.timestamp") |
| `keyColumnName` | String | Name of the column for the kafka record's key.| no (default = "kafka.key") |
| `headerFormat` | Object | `headerFormat` specifies how to parse the Kafka headers. Supports String types. Because Kafka header values are bytes, the parser decodes them as UTF-8 encoded strings. To change this behavior, implement your own parser based on the encoding style. Change the 'encoding' type in `KafkaStringHeaderFormat` to match your custom implementation. | no |
@ -183,7 +183,7 @@ For example:
"ioConfig": {
"inputFormat": {
"type": "kafka",
"headerLabelPrefix": "kafka.header.",
"headerColumnPrefix": "kafka.header.",
"timestampColumnName": "kafka.timestamp",
"keyColumnName": "kafka.key",
"headerFormat":

View File

@ -5733,7 +5733,7 @@ license_category: binary
module: web-console
license_name: Apache License version 2.0
copyright: Imply Data
version: 0.18.2
version: 0.18.3
---

View File

@ -116,7 +116,7 @@ function validateConnectLocalData(preview: string) {
expect(lines.length).toBe(500);
const firstLine = lines[0];
expect(firstLine).toBe(
'Druid row: {' +
'[Druid row: {' +
'"__time":1442018818771' +
',"channel":"#en.wikipedia"' +
',"comment":"added project"' +
@ -131,11 +131,11 @@ function validateConnectLocalData(preview: string) {
',"added":36' +
',"deleted":0' +
',"delta":36' +
'}',
'}]',
);
const lastLine = lines[lines.length - 1];
expect(lastLine).toBe(
'Druid row: {' +
'[Druid row: {' +
'"__time":1442020314823' +
',"channel":"#en.wikipedia"' +
',"comment":"/* History */[[WP:AWB/T|Typo fixing]], [[WP:AWB/T|typo(s) fixed]]: nothern → northern using [[Project:AWB|AWB]]"' +
@ -150,7 +150,7 @@ function validateConnectLocalData(preview: string) {
',"added":1' +
',"deleted":0' +
',"delta":1' +
'}',
'}]',
);
}

View File

@ -22,7 +22,7 @@
"d3-axis": "^2.1.0",
"d3-scale": "^3.3.0",
"d3-selection": "^2.0.0",
"druid-query-toolkit": "^0.18.2",
"druid-query-toolkit": "^0.18.3",
"file-saver": "^2.0.2",
"follow-redirects": "^1.14.7",
"fontsource-open-sans": "^3.0.9",
@ -8211,9 +8211,9 @@
}
},
"node_modules/druid-query-toolkit": {
"version": "0.18.2",
"resolved": "https://registry.npmjs.org/druid-query-toolkit/-/druid-query-toolkit-0.18.2.tgz",
"integrity": "sha512-MUqTm6wW+clI0pVeK9RIdB8svWK6mu44zsAw8BSVZYYKchigbBzTgwJe0vAYFBfR0TPjD1gJl62pSw4g0F14fQ==",
"version": "0.18.3",
"resolved": "https://registry.npmjs.org/druid-query-toolkit/-/druid-query-toolkit-0.18.3.tgz",
"integrity": "sha512-Za2U2NsFyun5HXeWnLCICnTFzZp4aC17aSOjgVbQgEWZNMPht51U4paE3SVhPDObkWDjDUYAqVv+mO+ZyMx9Og==",
"dependencies": {
"tslib": "^2.3.1"
},
@ -32625,9 +32625,9 @@
}
},
"druid-query-toolkit": {
"version": "0.18.2",
"resolved": "https://registry.npmjs.org/druid-query-toolkit/-/druid-query-toolkit-0.18.2.tgz",
"integrity": "sha512-MUqTm6wW+clI0pVeK9RIdB8svWK6mu44zsAw8BSVZYYKchigbBzTgwJe0vAYFBfR0TPjD1gJl62pSw4g0F14fQ==",
"version": "0.18.3",
"resolved": "https://registry.npmjs.org/druid-query-toolkit/-/druid-query-toolkit-0.18.3.tgz",
"integrity": "sha512-Za2U2NsFyun5HXeWnLCICnTFzZp4aC17aSOjgVbQgEWZNMPht51U4paE3SVhPDObkWDjDUYAqVv+mO+ZyMx9Og==",
"requires": {
"tslib": "^2.3.1"
}

View File

@ -76,7 +76,7 @@
"d3-axis": "^2.1.0",
"d3-scale": "^3.3.0",
"d3-selection": "^2.0.0",
"druid-query-toolkit": "^0.18.2",
"druid-query-toolkit": "^0.18.3",
"file-saver": "^2.0.2",
"follow-redirects": "^1.14.7",
"fontsource-open-sans": "^3.0.9",

View File

@ -23,7 +23,7 @@ const snarkdown = require('snarkdown');
const writefile = 'lib/sql-docs.js';
const MINIMUM_EXPECTED_NUMBER_OF_FUNCTIONS = 164;
const MINIMUM_EXPECTED_NUMBER_OF_FUNCTIONS = 167;
const MINIMUM_EXPECTED_NUMBER_OF_DATA_TYPES = 14;
const initialFunctionDocs = {

View File

@ -70,7 +70,7 @@ export interface Field<M> {
hide?: Functor<M, boolean>;
hideInMore?: Functor<M, boolean>;
valueAdjustment?: (value: any) => any;
adjustment?: (model: Partial<M>) => Partial<M>;
adjustment?: (model: Partial<M>, oldModel: Partial<M>) => Partial<M>;
issueWithValue?: (value: any) => string | undefined;
customSummary?: (v: any) => string;
@ -217,7 +217,7 @@ export class AutoForm<T extends Record<string, any>> extends React.PureComponent
}
if (field.adjustment) {
newModel = field.adjustment(newModel);
newModel = field.adjustment(newModel, model);
}
this.modelChange(newModel);

View File

@ -45,7 +45,6 @@ import {
import './console-application.scss';
export interface ConsoleApplicationProps {
exampleManifestsUrl?: string;
defaultQueryContext?: Record<string, any>;
mandatoryQueryContext?: Record<string, any>;
}
@ -213,15 +212,12 @@ export class ConsoleApplication extends React.PureComponent<
};
private readonly wrappedDataLoaderView = () => {
const { exampleManifestsUrl } = this.props;
return this.wrapInViewContainer(
'data-loader',
<LoadDataView
mode="all"
initTaskId={this.taskId}
initSupervisorId={this.supervisorId}
exampleManifestsUrl={exampleManifestsUrl}
goToIngestion={this.goToIngestionWithTaskGroupId}
/>,
'narrow-pad',
@ -241,14 +237,11 @@ export class ConsoleApplication extends React.PureComponent<
};
private readonly wrappedClassicBatchDataLoaderView = () => {
const { exampleManifestsUrl } = this.props;
return this.wrapInViewContainer(
'classic-batch-data-loader',
<LoadDataView
mode="batch"
initTaskId={this.taskId}
exampleManifestsUrl={exampleManifestsUrl}
goToIngestion={this.goToIngestionWithTaskGroupId}
/>,
'narrow-pad',

View File

@ -1,85 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { CSV_SAMPLE, JSON_SAMPLE } from '../../utils/sampler.mock';
import { getDimensionSpecs } from './dimension-spec';
describe('dimension-spec', () => {
describe('getDimensionSpecs', () => {
it('works for empty', () => {
expect(getDimensionSpecs({ header: ['header'], rows: [] }, {}, false, true)).toEqual([
'header',
]);
});
it('works with json', () => {
expect(getDimensionSpecs(JSON_SAMPLE, {}, false, false)).toEqual([
'timestamp',
'user',
{
name: 'followers',
type: 'long',
},
{
name: 'spend',
type: 'double',
},
'id',
'tags',
'nums',
]);
expect(getDimensionSpecs(JSON_SAMPLE, {}, false, true)).toEqual([
'timestamp',
'user',
'id',
'tags',
'nums',
]);
});
it('works with csv', () => {
expect(getDimensionSpecs(CSV_SAMPLE, {}, true, false)).toEqual([
'timestamp',
'user',
{
name: 'followers',
type: 'long',
},
{
name: 'spend',
type: 'double',
},
{
name: 'id',
type: 'long',
},
'tags',
'nums',
]);
expect(getDimensionSpecs(CSV_SAMPLE, {}, true, true)).toEqual([
'timestamp',
'user',
'tags',
'nums',
]);
});
});
});

View File

@ -18,13 +18,16 @@
import type { Field } from '../../components';
import { filterMap, typeIs } from '../../utils';
import type { SampleHeaderAndRows } from '../../utils/sampler';
import { guessColumnTypeFromHeaderAndRows } from '../ingestion-spec/ingestion-spec';
import type { SampleResponse } from '../../utils/sampler';
import { getHeaderNamesFromSampleResponse } from '../../utils/sampler';
import { guessColumnTypeFromSampleResponse } from '../ingestion-spec/ingestion-spec';
export interface DimensionsSpec {
readonly dimensions?: (string | DimensionSpec)[];
readonly dimensionExclusions?: string[];
readonly spatialDimensions?: any[];
readonly includeAllDimensions?: boolean;
readonly useSchemaDiscovery?: boolean;
}
export interface DimensionSpec {
@ -77,20 +80,19 @@ export function inflateDimensionSpec(dimensionSpec: string | DimensionSpec): Dim
}
export function getDimensionSpecs(
headerAndRows: SampleHeaderAndRows,
sampleResponse: SampleResponse,
typeHints: Record<string, string>,
guessNumericStringsAsNumbers: boolean,
hasRollup: boolean,
): (string | DimensionSpec)[] {
return filterMap(headerAndRows.header, h => {
if (h === '__time') return;
const type =
return filterMap(getHeaderNamesFromSampleResponse(sampleResponse, true), h => {
const dimensionType =
typeHints[h] ||
guessColumnTypeFromHeaderAndRows(headerAndRows, h, guessNumericStringsAsNumbers);
if (type === 'string') return h;
guessColumnTypeFromSampleResponse(sampleResponse, h, guessNumericStringsAsNumbers);
if (dimensionType === 'string') return h;
if (hasRollup) return;
return {
type,
type: dimensionType === 'COMPLEX<json>' ? 'json' : dimensionType,
name: h,
};
});

View File

@ -16,15 +16,15 @@
* limitations under the License.
*/
import { CSV_SAMPLE } from '../../utils/sampler.mock';
import { CSV_SAMPLE, JSON_SAMPLE } from '../../utils/sampler.mock';
import type { IngestionSpec } from './ingestion-spec';
import {
adjustId,
cleanSpec,
guessColumnTypeFromHeaderAndRows,
guessColumnTypeFromInput,
guessInputFormat,
guessColumnTypeFromSampleResponse,
guessSimpleInputFormat,
updateSchemaWithSample,
upgradeSpec,
} from './ingestion-spec';
@ -565,26 +565,26 @@ describe('ingestion-spec', () => {
});
});
describe('guessInputFormat', () => {
describe('guessSimpleInputFormat', () => {
it('works for parquet', () => {
expect(guessInputFormat(['PAR1lol']).type).toEqual('parquet');
expect(guessSimpleInputFormat(['PAR1lol']).type).toEqual('parquet');
});
it('works for orc', () => {
expect(guessInputFormat(['ORClol']).type).toEqual('orc');
expect(guessSimpleInputFormat(['ORClol']).type).toEqual('orc');
});
it('works for AVRO', () => {
expect(guessInputFormat(['Obj\x01lol']).type).toEqual('avro_ocf');
expect(guessInputFormat(['Obj1lol']).type).toEqual('regex');
expect(guessSimpleInputFormat(['Obj\x01lol']).type).toEqual('avro_ocf');
expect(guessSimpleInputFormat(['Obj1lol']).type).toEqual('regex');
});
it('works for JSON (strict)', () => {
expect(guessInputFormat(['{"a":1}'])).toEqual({ type: 'json' });
expect(guessSimpleInputFormat(['{"a":1}'])).toEqual({ type: 'json' });
});
it('works for JSON (lax)', () => {
expect(guessInputFormat([`{hello:'world'}`])).toEqual({
expect(guessSimpleInputFormat([`{hello:'world'}`])).toEqual({
type: 'json',
featureSpec: {
ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER: true,
@ -602,14 +602,14 @@ describe('ingestion-spec', () => {
});
it('works for CSV (with header)', () => {
expect(guessInputFormat(['A,B,"X,1",Y'])).toEqual({
expect(guessSimpleInputFormat(['A,B,"X,1",Y'])).toEqual({
type: 'csv',
findColumnsFromHeader: true,
});
});
it('works for CSV (no header)', () => {
expect(guessInputFormat(['"A,1","B,2",1,2'])).toEqual({
expect(guessSimpleInputFormat(['"A,1","B,2",1,2'])).toEqual({
type: 'csv',
findColumnsFromHeader: false,
columns: ['column1', 'column2', 'column3', 'column4'],
@ -617,14 +617,14 @@ describe('ingestion-spec', () => {
});
it('works for TSV (with header)', () => {
expect(guessInputFormat(['A\tB\tX\tY'])).toEqual({
expect(guessSimpleInputFormat(['A\tB\tX\tY'])).toEqual({
type: 'tsv',
findColumnsFromHeader: true,
});
});
it('works for TSV (no header)', () => {
expect(guessInputFormat(['A\tB\t1\t2\t3\t4\t5\t6\t7\t8\t9'])).toEqual({
expect(guessSimpleInputFormat(['A\tB\t1\t2\t3\t4\t5\t6\t7\t8\t9'])).toEqual({
type: 'tsv',
findColumnsFromHeader: false,
columns: [
@ -644,7 +644,7 @@ describe('ingestion-spec', () => {
});
it('works for TSV with ;', () => {
const inputFormat = guessInputFormat(['A;B;X;Y']);
const inputFormat = guessSimpleInputFormat(['A;B;X;Y']);
expect(inputFormat).toEqual({
type: 'tsv',
delimiter: ';',
@ -653,7 +653,7 @@ describe('ingestion-spec', () => {
});
it('works for TSV with |', () => {
const inputFormat = guessInputFormat(['A|B|X|Y']);
const inputFormat = guessSimpleInputFormat(['A|B|X|Y']);
expect(inputFormat).toEqual({
type: 'tsv',
delimiter: '|',
@ -662,7 +662,7 @@ describe('ingestion-spec', () => {
});
it('works for regex', () => {
expect(guessInputFormat(['A/B/X/Y'])).toEqual({
expect(guessSimpleInputFormat(['A/B/X/Y'])).toEqual({
type: 'regex',
pattern: '([\\s\\S]*)',
columns: ['line'],
@ -745,30 +745,19 @@ describe('spec utils', () => {
});
});
describe('guessColumnTypeFromHeaderAndRows', () => {
it('works in empty dataset', () => {
expect(guessColumnTypeFromHeaderAndRows({ header: ['c0'], rows: [] }, 'c0', false)).toEqual(
'string',
);
});
describe('guessColumnTypeFromSampleResponse', () => {
it('works for generic dataset', () => {
expect(guessColumnTypeFromHeaderAndRows(CSV_SAMPLE, 'user', false)).toEqual('string');
expect(guessColumnTypeFromHeaderAndRows(CSV_SAMPLE, 'followers', false)).toEqual('string');
expect(guessColumnTypeFromHeaderAndRows(CSV_SAMPLE, 'followers', true)).toEqual('long');
expect(guessColumnTypeFromHeaderAndRows(CSV_SAMPLE, 'spend', true)).toEqual('double');
expect(guessColumnTypeFromHeaderAndRows(CSV_SAMPLE, 'nums', false)).toEqual('string');
expect(guessColumnTypeFromHeaderAndRows(CSV_SAMPLE, 'nums', true)).toEqual('string');
expect(guessColumnTypeFromSampleResponse(CSV_SAMPLE, 'user', false)).toEqual('string');
expect(guessColumnTypeFromSampleResponse(CSV_SAMPLE, 'followers', false)).toEqual('string');
expect(guessColumnTypeFromSampleResponse(CSV_SAMPLE, 'followers', true)).toEqual('long');
expect(guessColumnTypeFromSampleResponse(CSV_SAMPLE, 'spend', true)).toEqual('double');
expect(guessColumnTypeFromSampleResponse(CSV_SAMPLE, 'nums', false)).toEqual('string');
expect(guessColumnTypeFromSampleResponse(CSV_SAMPLE, 'nums', true)).toEqual('string');
});
});
it('updateSchemaWithSample', () => {
const withRollup = updateSchemaWithSample(
ingestionSpec,
{ header: ['header'], rows: [] },
'specific',
true,
);
const withRollup = updateSchemaWithSample(ingestionSpec, JSON_SAMPLE, 'specific', true);
expect(withRollup).toMatchInlineSnapshot(`
Object {
@ -777,7 +766,10 @@ describe('spec utils', () => {
"dataSource": "wikipedia",
"dimensionsSpec": Object {
"dimensions": Array [
"header",
"user",
"id",
"tags",
"nums",
],
},
"granularitySpec": Object {
@ -790,6 +782,16 @@ describe('spec utils', () => {
"name": "count",
"type": "count",
},
Object {
"fieldName": "followers",
"name": "sum_followers",
"type": "longSum",
},
Object {
"fieldName": "spend",
"name": "sum_spend",
"type": "doubleSum",
},
],
"timestampSpec": Object {
"column": "timestamp",
@ -820,12 +822,7 @@ describe('spec utils', () => {
}
`);
const noRollup = updateSchemaWithSample(
ingestionSpec,
{ header: ['header'], rows: [] },
'specific',
false,
);
const noRollup = updateSchemaWithSample(ingestionSpec, JSON_SAMPLE, 'specific', false);
expect(noRollup).toMatchInlineSnapshot(`
Object {
@ -834,7 +831,18 @@ describe('spec utils', () => {
"dataSource": "wikipedia",
"dimensionsSpec": Object {
"dimensions": Array [
"header",
"user",
Object {
"name": "followers",
"type": "long",
},
Object {
"name": "spend",
"type": "double",
},
"id",
"tags",
"nums",
],
},
"granularitySpec": Object {

View File

@ -34,12 +34,13 @@ import {
EMPTY_ARRAY,
EMPTY_OBJECT,
filterMap,
findMap,
isSimpleArray,
oneOf,
parseCsvLine,
typeIs,
} from '../../utils';
import type { SampleHeaderAndRows } from '../../utils/sampler';
import type { SampleResponse } from '../../utils/sampler';
import type { DimensionsSpec } from '../dimension-spec/dimension-spec';
import {
getDimensionSpecName,
@ -269,6 +270,9 @@ export interface DataSchema {
export type DimensionMode = 'specific' | 'auto-detect';
export function getDimensionMode(spec: Partial<IngestionSpec>): DimensionMode {
if (deepGet(spec, 'spec.dataSchema.dimensionsSpec.useSchemaDiscovery') === true) {
return 'auto-detect';
}
const dimensions = deepGet(spec, 'spec.dataSchema.dimensionsSpec.dimensions') || EMPTY_ARRAY;
return Array.isArray(dimensions) && dimensions.length === 0 ? 'auto-detect' : 'specific';
}
@ -893,6 +897,7 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F
label: 'Bootstrap servers',
type: 'string',
required: true,
placeholder: 'kafka_broker_host:9092',
info: (
<>
<ExternalLink
@ -914,6 +919,7 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F
type: 'string',
required: true,
defined: typeIs('kafka'),
placeholder: 'topic_name',
},
{
name: 'consumerProperties',
@ -2134,13 +2140,13 @@ export function updateIngestionType(
}
export function issueWithSampleData(
sampleData: string[],
sampleData: SampleResponse,
spec: Partial<IngestionSpec>,
): JSX.Element | undefined {
if (isStreamingSpec(spec)) return;
if (sampleData.length) {
const firstData = sampleData[0];
const firstData: string = findMap(sampleData.data, l => l.input?.raw);
if (firstData) return;
if (firstData === '{') {
return (
@ -2161,20 +2167,25 @@ export function issueWithSampleData(
</>
);
}
}
return;
}
export function fillInputFormatIfNeeded(
spec: Partial<IngestionSpec>,
sampleData: string[],
sampleResponse: SampleResponse,
): Partial<IngestionSpec> {
if (deepGet(spec, 'spec.ioConfig.inputFormat.type')) return spec;
return deepSet(
spec,
'spec.ioConfig.inputFormat',
guessInputFormat(sampleData, isStreamingSpec(spec)),
getSpecType(spec) === 'kafka'
? guessKafkaInputFormat(filterMap(sampleResponse.data, l => l.input))
: guessSimpleInputFormat(
filterMap(sampleResponse.data, l => l.input?.raw),
isStreamingSpec(spec),
),
);
}
@ -2182,8 +2193,23 @@ function noNumbers(xs: string[]): boolean {
return xs.every(x => isNaN(Number(x)));
}
export function guessInputFormat(sampleData: string[], canBeMultiLineJson = false): InputFormat {
let sampleDatum = sampleData[0];
export function guessKafkaInputFormat(sampleRaw: Record<string, any>[]): InputFormat {
const hasHeader = sampleRaw.some(x => Object.keys(x).some(k => k.startsWith('kafka.header.')));
const keys = filterMap(sampleRaw, x => x['kafka.key']);
const payloads = filterMap(sampleRaw, x => x.raw);
return {
type: 'kafka',
headerFormat: hasHeader ? { type: 'string' } : undefined,
keyFormat: keys.length ? guessSimpleInputFormat(keys, true) : undefined,
valueFormat: guessSimpleInputFormat(payloads, true),
};
}
export function guessSimpleInputFormat(
sampleRaw: string[],
canBeMultiLineJson = false,
): InputFormat {
let sampleDatum = sampleRaw[0];
if (sampleDatum) {
sampleDatum = String(sampleDatum); // Really ensure it is a string
@ -2319,11 +2345,11 @@ function inputFormatFromType(options: InputFormatFromTypeOptions): InputFormat {
// ------------------------
export function guessIsArrayFromHeaderAndRows(
headerAndRows: SampleHeaderAndRows,
export function guessIsArrayFromSampleResponse(
sampleResponse: SampleResponse,
column: string,
): boolean {
return headerAndRows.rows.some(r => isSimpleArray(r.input?.[column]));
return sampleResponse.data.some(r => isSimpleArray(r.input?.[column]));
}
export function guessColumnTypeFromInput(
@ -2355,13 +2381,13 @@ export function guessColumnTypeFromInput(
}
}
export function guessColumnTypeFromHeaderAndRows(
headerAndRows: SampleHeaderAndRows,
export function guessColumnTypeFromSampleResponse(
sampleResponse: SampleResponse,
column: string,
guessNumericStringsAsNumbers: boolean,
): string {
return guessColumnTypeFromInput(
filterMap(headerAndRows.rows, r => r.input?.[column]),
filterMap(sampleResponse.data, r => r.input?.[column]),
guessNumericStringsAsNumbers,
);
}
@ -2391,7 +2417,7 @@ function getTypeHintsFromSpec(spec: Partial<IngestionSpec>): Record<string, stri
export function updateSchemaWithSample(
spec: Partial<IngestionSpec>,
headerAndRows: SampleHeaderAndRows,
sampleResponse: SampleResponse,
dimensionMode: DimensionMode,
rollup: boolean,
forcePartitionInitialization = false,
@ -2404,26 +2430,25 @@ export function updateSchemaWithSample(
let newSpec = spec;
if (dimensionMode === 'auto-detect') {
newSpec = deepDelete(newSpec, 'spec.dataSchema.dimensionsSpec.dimensions');
newSpec = deepSet(newSpec, 'spec.dataSchema.dimensionsSpec.useSchemaDiscovery', true);
newSpec = deepSet(newSpec, 'spec.dataSchema.dimensionsSpec.includeAllDimensions', true);
newSpec = deepSet(newSpec, 'spec.dataSchema.dimensionsSpec.dimensionExclusions', []);
newSpec = deepDelete(newSpec, 'spec.dataSchema.dimensionsSpec.dimensions');
} else {
newSpec = deepDelete(newSpec, 'spec.dataSchema.dimensionsSpec.useSchemaDiscovery');
newSpec = deepDelete(newSpec, 'spec.dataSchema.dimensionsSpec.includeAllDimensions');
newSpec = deepDelete(newSpec, 'spec.dataSchema.dimensionsSpec.dimensionExclusions');
const dimensions = getDimensionSpecs(
headerAndRows,
typeHints,
guessNumericStringsAsNumbers,
rollup,
newSpec = deepSet(
newSpec,
'spec.dataSchema.dimensionsSpec.dimensions',
getDimensionSpecs(sampleResponse, typeHints, guessNumericStringsAsNumbers, rollup),
);
if (dimensions) {
newSpec = deepSet(newSpec, 'spec.dataSchema.dimensionsSpec.dimensions', dimensions);
}
}
if (rollup) {
newSpec = deepSet(newSpec, 'spec.dataSchema.granularitySpec.queryGranularity', 'hour');
const metrics = getMetricSpecs(headerAndRows, typeHints, guessNumericStringsAsNumbers);
const metrics = getMetricSpecs(sampleResponse, typeHints, guessNumericStringsAsNumbers);
if (metrics) {
newSpec = deepSet(newSpec, 'spec.dataSchema.metricsSpec', metrics);
}

View File

@ -22,7 +22,7 @@ import React from 'react';
import type { Field } from '../../components';
import { AutoForm, ExternalLink } from '../../components';
import { getLink } from '../../links';
import { compact, oneOf, typeIs } from '../../utils';
import { compact, deepGet, deepSet, oneOf, typeIs } from '../../utils';
import type { FlattenSpec } from '../flatten-spec/flatten-spec';
export interface InputFormat {
@ -39,6 +39,14 @@ export interface InputFormat {
readonly keepNullColumns?: boolean;
readonly assumeNewlineDelimited?: boolean;
readonly useJsonNodeReader?: boolean;
// type: kafka
readonly timestampColumnName?: string;
readonly headerFormat?: { type: 'string'; encoding?: string };
readonly headerColumnPrefix?: string;
readonly keyFormat?: InputFormat;
readonly keyColumnName?: string;
readonly valueFormat?: InputFormat;
}
function generateInputFormatFields(streaming: boolean) {
@ -88,7 +96,7 @@ function generateInputFormatFields(streaming: boolean) {
name: 'assumeNewlineDelimited',
type: 'boolean',
defined: typeIs('json'),
disabled: (inputFormat: InputFormat) => inputFormat.useJsonNodeReader,
disabled: inputFormat => Boolean(inputFormat.useJsonNodeReader),
defaultValue: false,
info: (
<>
@ -115,10 +123,10 @@ function generateInputFormatFields(streaming: boolean) {
streaming
? {
name: 'useJsonNodeReader',
title: 'Use JSON node reader',
label: 'Use JSON node reader',
type: 'boolean',
defined: typeIs('json'),
disabled: (inputFormat: InputFormat) => inputFormat.assumeNewlineDelimited,
disabled: inputFormat => Boolean(inputFormat.assumeNewlineDelimited),
defaultValue: false,
info: (
<>
@ -224,11 +232,274 @@ function generateInputFormatFields(streaming: boolean) {
] as (Field<InputFormat> | undefined)[]);
}
export const INPUT_FORMAT_FIELDS: Field<InputFormat>[] = generateInputFormatFields(false);
export const BATCH_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = generateInputFormatFields(false);
export const STREAMING_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = generateInputFormatFields(true);
export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
{
name: 'timestampColumnName',
label: 'Kafka timestamp column name',
type: 'string',
defaultValue: 'kafka.timestamp',
defined: typeIs('kafka'),
info: `Name of the column for the kafka record's timestamp.`,
},
// -----------------------------------------------------
// keyFormat fields
{
name: 'keyFormat.type',
label: 'Kafka key input format',
type: 'string',
suggestions: [
undefined,
'json',
'csv',
'tsv',
'parquet',
'orc',
'avro_ocf',
'avro_stream',
'regex',
],
placeholder: `(don't parse Kafka key)`,
defined: typeIs('kafka'),
info: (
<>
<p>The parser used to parse the key of the Kafka message.</p>
<p>
For more information see{' '}
<ExternalLink href={`${getLink('DOCS')}/ingestion/data-formats.html`}>
the documentation
</ExternalLink>
.
</p>
</>
),
adjustment: inputFormat => {
const keyFormatType = deepGet(inputFormat, 'keyFormat.type');
// If the user selects one of these formats then populate the columns (that are in any case meaningless in this context)
// with an initial value.
switch (keyFormatType) {
case 'regex':
inputFormat = deepSet(inputFormat, 'keyFormat.pattern', '([\\s\\S]*)');
inputFormat = deepSet(inputFormat, 'keyFormat.columns', ['x']);
break;
case 'csv':
case 'tsv':
inputFormat = deepSet(inputFormat, 'keyFormat.findColumnsFromHeader', false);
inputFormat = deepSet(inputFormat, 'keyFormat.columns', ['x']);
break;
}
return inputFormat;
},
},
{
name: 'keyFormat.featureSpec',
label: 'Kafka key JSON parser features',
type: 'json',
defined: inputFormat => deepGet(inputFormat, 'keyFormat.type') === 'json',
hideInMore: true,
info: (
<>
<p>
<ExternalLink href="https://github.com/FasterXML/jackson-core/wiki/JsonParser-Features">
JSON parser features
</ExternalLink>{' '}
supported by Jackson library. Those features will be applied when parsing the input JSON
data.
</p>
<p>
Example:{' '}
<Code>{`{ "ALLOW_SINGLE_QUOTES": true, "ALLOW_UNQUOTED_FIELD_NAMES": true }`}</Code>
</p>
</>
),
},
{
name: 'keyFormat.assumeNewlineDelimited',
label: 'Kafka key assume newline delimited',
type: 'boolean',
defined: inputFormat => deepGet(inputFormat, 'keyFormat.type') === 'json',
disabled: inputFormat => Boolean(inputFormat.useJsonNodeReader),
defaultValue: false,
hideInMore: true,
info: (
<>
<p>
In streaming ingestion, multi-line JSON events can be ingested (i.e. where a single JSON
event spans multiple lines). However, if a parsing exception occurs, all JSON events that
are present in the same streaming record will be discarded.
</p>
<p>
<Code>assumeNewlineDelimited</Code> and <Code>useJsonNodeReader</Code> (at most one can be{' '}
<Code>true</Code>) affect only how parsing exceptions are handled.
</p>
<p>
If the input is known to be newline delimited JSON (each individual JSON event is
contained in a single line, separated by newlines), setting this option to true allows for
more flexible parsing exception handling. Only the lines with invalid JSON syntax will be
discarded, while lines containing valid JSON events will still be ingested.
</p>
</>
),
},
{
name: 'keyFormat.useJsonNodeReader',
label: 'Kafka key use JSON node reader',
type: 'boolean',
defined: inputFormat => deepGet(inputFormat, 'keyFormat.type') === 'json',
disabled: inputFormat => Boolean(inputFormat.assumeNewlineDelimited),
defaultValue: false,
hideInMore: true,
info: (
<>
{' '}
<p>
In streaming ingestion, multi-line JSON events can be ingested (i.e. where a single JSON
event spans multiple lines). However, if a parsing exception occurs, all JSON events that
are present in the same streaming record will be discarded.
</p>
<p>
<Code>assumeNewlineDelimited</Code> and <Code>useJsonNodeReader</Code> (at most one can be{' '}
<Code>true</Code>) affect only how parsing exceptions are handled.
</p>
<p>
When ingesting multi-line JSON events, enabling this option will enable the use of a JSON
parser which will retain any valid JSON events encountered within a streaming record prior
to when a parsing exception occurred.
</p>
</>
),
},
{
name: 'keyFormat.delimiter',
label: 'Kafka key delimiter',
type: 'string',
defaultValue: '\t',
suggestions: ['\t', ';', '|', '#'],
defined: inputFormat => deepGet(inputFormat, 'keyFormat.type') === 'tsv',
info: <>A custom delimiter for data values.</>,
},
{
name: 'keyFormat.pattern',
label: 'Kafka key pattern',
type: 'string',
defined: inputFormat => deepGet(inputFormat, 'keyFormat.type') === 'regex',
required: true,
},
{
name: 'keyFormat.skipHeaderRows',
label: 'Kafka key skip header rows',
type: 'number',
defaultValue: 0,
defined: inputFormat => oneOf(deepGet(inputFormat, 'keyFormat.type'), 'csv', 'tsv'),
min: 0,
info: (
<>
If this is set, skip the first <Code>skipHeaderRows</Code> rows from each file.
</>
),
},
{
name: 'keyFormat.findColumnsFromHeader',
label: 'Kafka key find columns from header',
type: 'boolean',
defined: inputFormat => oneOf(deepGet(inputFormat, 'keyFormat.type'), 'csv', 'tsv'),
required: true,
hideInMore: true,
info: (
<>
If this is set, find the column names from the header row. Note that
<Code>skipHeaderRows</Code> will be applied before finding column names from the header. For
example, if you set <Code>skipHeaderRows</Code> to 2 and <Code>findColumnsFromHeader</Code>{' '}
to true, the task will skip the first two lines and then extract column information from the
third line.
</>
),
},
{
name: 'keyFormat.columns',
label: 'Kafka key columns',
type: 'string-array',
required: true,
defined: inputFormat =>
(oneOf(deepGet(inputFormat, 'keyFormat.type'), 'csv', 'tsv') &&
deepGet(inputFormat, 'keyFormat.findColumnsFromHeader') === false) ||
deepGet(inputFormat, 'keyFormat.type') === 'regex',
hideInMore: true,
info: (
<>
Only the value of the first column will be read, the name of the column will be ignored so
enter anything here.
</>
),
},
{
name: 'keyFormat.listDelimiter',
label: 'Kafka key list delimiter',
type: 'string',
defaultValue: '\x01',
suggestions: ['\x01', '\x00'],
defined: inputFormat => oneOf(deepGet(inputFormat, 'keyFormat.type'), 'csv', 'tsv', 'regex'),
info: <>A custom delimiter for multi-value dimensions.</>,
},
{
name: 'keyFormat.binaryAsString',
label: 'Kafka key list binary as string',
type: 'boolean',
defaultValue: false,
defined: inputFormat =>
oneOf(deepGet(inputFormat, 'valueFormat.type'), 'parquet', 'orc', 'avro_ocf', 'avro_stream'),
info: (
<>
Specifies if the binary column which is not logically marked as a string should be treated
as a UTF-8 encoded string.
</>
),
},
// keyColumnName
{
name: 'keyColumnName',
label: 'Kafka key column name',
type: 'string',
defaultValue: 'kafka.key',
defined: inputFormat => Boolean(deepGet(inputFormat, 'keyFormat.type')),
info: `Custom prefix for all the header columns.`,
},
// -----------------------------------------------------
{
name: 'headerFormat.type',
label: 'Kafka header format type',
type: 'string',
defined: typeIs('kafka'),
placeholder: `(don't parse Kafka herders)`,
suggestions: [undefined, 'string'],
},
{
name: 'headerFormat.encoding',
label: 'Kafka header format encoding',
type: 'string',
defaultValue: 'UTF-8',
defined: inputFormat => deepGet(inputFormat, 'headerFormat.type') === 'string',
suggestions: ['UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16', 'US-ASCII', 'ISO-8859-1'],
},
{
name: 'headerColumnPrefix',
label: 'Kafka header column prefix',
type: 'string',
defaultValue: 'kafka.header.',
defined: inputFormat => deepGet(inputFormat, 'headerFormat.type') === 'string',
info: `Custom prefix for all the header columns.`,
},
];
export function issueWithInputFormat(inputFormat: InputFormat | undefined): string | undefined {
return AutoForm.issueWithModel(inputFormat, INPUT_FORMAT_FIELDS);
return AutoForm.issueWithModel(inputFormat, BATCH_INPUT_FORMAT_FIELDS);
}
export const inputFormatCanProduceNestedData: (inputFormat: InputFormat) => boolean = typeIs(

View File

@ -16,16 +16,14 @@
* limitations under the License.
*/
import { JSON_SAMPLE } from '../../utils/sampler.mock';
import { EMPTY_SAMPLE, JSON_SAMPLE } from '../../utils/sampler.mock';
import { getMetricSpecs } from './metric-spec';
describe('metric-spec', () => {
describe('getMetricSecs', () => {
it('works for empty', () => {
expect(getMetricSpecs({ header: ['header'], rows: [] }, {}, false)).toEqual([
{ name: 'count', type: 'count' },
]);
expect(getMetricSpecs(EMPTY_SAMPLE, {}, false)).toEqual([{ name: 'count', type: 'count' }]);
});
it('works with json', () => {

View File

@ -23,8 +23,8 @@ import type { Field } from '../../components';
import { ExternalLink } from '../../components';
import { getLink } from '../../links';
import { filterMap, typeIs } from '../../utils';
import type { SampleHeaderAndRows } from '../../utils/sampler';
import { guessColumnTypeFromHeaderAndRows } from '../ingestion-spec/ingestion-spec';
import type { SampleResponse } from '../../utils/sampler';
import { guessColumnTypeFromSampleResponse } from '../ingestion-spec/ingestion-spec';
export interface MetricSpec {
readonly type: string;
@ -388,16 +388,17 @@ export function getMetricSpecOutputType(metricSpec: MetricSpec): string | undefi
}
export function getMetricSpecs(
headerAndRows: SampleHeaderAndRows,
sampleResponse: SampleResponse,
typeHints: Record<string, string>,
guessNumericStringsAsNumbers: boolean,
): MetricSpec[] {
return [{ name: 'count', type: 'count' }].concat(
filterMap(headerAndRows.header, h => {
filterMap(sampleResponse.logicalSegmentSchema, s => {
const h = s.name;
if (h === '__time') return;
const type =
typeHints[h] ||
guessColumnTypeFromHeaderAndRows(headerAndRows, h, guessNumericStringsAsNumbers);
guessColumnTypeFromSampleResponse(sampleResponse, h, guessNumericStringsAsNumbers);
switch (type) {
case 'double':
return { name: `sum_${h}`, type: 'doubleSum', fieldName: h };

View File

@ -16,8 +16,9 @@
* limitations under the License.
*/
import { timeFormatMatches } from './time';
import { possibleDruidFormatForValues, timeFormatMatches } from './time';
describe('time', () => {
describe('timeFormatMatches', () => {
it('works for auto', () => {
expect(timeFormatMatches('auto', '2019-05-22 22:42:51+0000')).toBeTruthy();
@ -28,3 +29,18 @@ describe('timeFormatMatches', () => {
expect(timeFormatMatches('iso', '2019-05-22 22:42:51+0000')).toBeFalsy();
});
});
describe('possibleDruidFormatForValues', () => {
it('works in empty case', () => {
expect(possibleDruidFormatForValues([])).toBeUndefined();
});
it('does not react to small numbers', () => {
expect(possibleDruidFormatForValues([12, 234, 3432])).toBeUndefined();
});
it('works for auto', () => {
expect(possibleDruidFormatForValues(['2019-05-22 22:42:51'])).toEqual('auto');
});
});
});

View File

@ -86,6 +86,7 @@ export function timeFormatMatches(format: string, value: string | number | bigin
}
export function possibleDruidFormatForValues(values: any[]): string | undefined {
if (!values.length) return;
return ALL_FORMAT_VALUES.find(format => {
return values.every(value => timeFormatMatches(format, value));
});

View File

@ -54,9 +54,6 @@ interface ConsoleConfig {
// A set of custom headers name/value to set on every AJAX request
customHeaders?: Record<string, string>;
// The URL for where to load the example manifest, a JSON document that tells the console where to find all the example datasets
exampleManifestsUrl?: string;
// The query context to set if the user does not have one saved in local storage, defaults to {}
defaultQueryContext?: Record<string, any>;
@ -104,7 +101,6 @@ QueryRunner.defaultQueryExecutor = (payload, isSql, cancelToken) => {
ReactDOM.render(
React.createElement(ConsoleApplication, {
exampleManifestsUrl: consoleConfig.exampleManifestsUrl,
defaultQueryContext: consoleConfig.defaultQueryContext,
mandatoryQueryContext: consoleConfig.mandatoryQueryContext,
}),

View File

@ -36,7 +36,7 @@ import type {
Transform,
} from '../druid-models';
import { inflateDimensionSpec, upgradeSpec } from '../druid-models';
import { deepGet, filterMap, oneOf } from '../utils';
import { deepGet, filterMap, nonEmptyArray, oneOf } from '../utils';
export function getSpecDatasourceName(spec: IngestionSpec): string {
return deepGet(spec, 'spec.dataSchema.dataSource') || 'unknown_datasource';
@ -86,6 +86,10 @@ export function convertSpecToSql(spec: any): QueryWithContext {
const rollup = deepGet(spec, 'spec.dataSchema.granularitySpec.rollup') ?? true;
if (nonEmptyArray(deepGet(spec, 'spec.dataSchema.dimensionsSpec.spatialDimensions'))) {
throw new Error(`spatialDimensions are not currently supported in SQL-based ingestion`);
}
const timestampSpec: TimestampSpec = deepGet(spec, 'spec.dataSchema.timestampSpec');
if (!timestampSpec) throw new Error(`spec.dataSchema.timestampSpec is not defined`);

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { useEffect, useRef } from 'react';
export function useLastDefinedDeep<T>(cur: T | undefined): T | undefined {
const last = useRef<T>();
useEffect(() => {
if (typeof cur === 'undefined' || JSON.stringify(last.current) === JSON.stringify(cur)) return;
last.current = cur;
}, [cur]);
return typeof cur === 'undefined' ? last.current : cur;
}

View File

@ -278,6 +278,13 @@ export function filterMap<T, Q>(xs: readonly T[], f: (x: T, i: number) => Q | un
return xs.map(f).filter((x: Q | undefined) => typeof x !== 'undefined') as Q[];
}
export function findMap<T, Q>(
xs: readonly T[],
f: (x: T, i: number) => Q | undefined,
): Q | undefined {
return filterMap(xs, f)[0];
}
export function compact<T>(xs: (T | undefined | false | null | '')[]): T[] {
return xs.filter(Boolean) as T[];
}

View File

@ -17,21 +17,66 @@
*/
// Just to make sure we are in a test context. This line will cause trouble if this file is ever compiled into the main build
import type { SampleHeaderAndRows } from './sampler';
import type { SampleResponse } from './sampler';
expect(1).toEqual(1);
expect(1).toEqual(1); // Just to make sure this file does not get included in the build by accident
export const EMPTY_SAMPLE: SampleResponse = {
numRowsRead: 0,
numRowsIndexed: 0,
logicalDimensions: [],
physicalDimensions: [],
logicalSegmentSchema: [{ name: '__time', type: 'LONG' }],
data: [],
};
/*
This data is the returned sample when ingested with:
This data is the returned sample when sampling (from the timestamp stage):
{"timestamp":"2016-04-11T09:20:00Z","user":"Alice","followers":10,"spend":0,"id":"12232323","tags":null,"nums":[4]}
{"timestamp":"2016-04-11T09:21:00Z","user":"Bob","followers":0,"spend":3,"id":"45345634","tags":["a"],"nums":[5,6]}
{"timestamp":"2016-04-11T09:22:00Z","user":"Alice","followers":3,"spend":5.1,"id":"73534533","tags":["a","b"],"nums":[7,8]}
*/
export const JSON_SAMPLE: SampleHeaderAndRows = {
header: ['timestamp', 'user', 'followers', 'spend', 'id', 'tags', 'nums'],
rows: [
export const JSON_SAMPLE: SampleResponse = {
numRowsRead: 3,
numRowsIndexed: 3,
logicalDimensions: [
{ type: 'string', name: 'user', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
{
type: 'long',
name: 'followers',
multiValueHandling: 'SORTED_ARRAY',
createBitmapIndex: false,
},
{ type: 'json', name: 'spend', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
{ type: 'string', name: 'id', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
{ type: 'json', name: 'tags', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
{ type: 'json', name: 'nums', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
],
physicalDimensions: [
{ type: 'auto', name: 'user', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
{
type: 'auto',
name: 'followers',
multiValueHandling: 'SORTED_ARRAY',
createBitmapIndex: true,
},
{ type: 'auto', name: 'spend', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
{ type: 'auto', name: 'id', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
{ type: 'auto', name: 'tags', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
{ type: 'auto', name: 'nums', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
],
logicalSegmentSchema: [
{ name: '__time', type: 'LONG' },
{ name: 'user', type: 'STRING' },
{ name: 'followers', type: 'LONG' },
{ name: 'spend', type: 'COMPLEX<json>' },
{ name: 'id', type: 'STRING' },
{ name: 'tags', type: 'ARRAY<STRING>' },
{ name: 'nums', type: 'ARRAY<LONG>' },
],
data: [
{
input: {
timestamp: '2016-04-11T09:20:00Z',
@ -43,14 +88,13 @@ export const JSON_SAMPLE: SampleHeaderAndRows = {
nums: [4],
},
parsed: {
__time: 0,
timestamp: '2016-04-11T09:20:00Z',
__time: 1460366400000,
user: 'Alice',
followers: '10',
spend: '0',
followers: 10,
spend: 0,
id: '12232323',
tags: null,
nums: '4',
nums: [4],
},
},
{
@ -64,14 +108,13 @@ export const JSON_SAMPLE: SampleHeaderAndRows = {
nums: [5, 6],
},
parsed: {
__time: 0,
timestamp: '2016-04-11T09:21:00Z',
__time: 1460366460000,
user: 'Bob',
followers: '0',
spend: '3',
followers: 0,
spend: 3,
id: '45345634',
tags: 'a',
nums: ['5', '6'],
tags: ['a'],
nums: [5, 6],
},
},
{
@ -85,14 +128,13 @@ export const JSON_SAMPLE: SampleHeaderAndRows = {
nums: [7, 8],
},
parsed: {
__time: 0,
timestamp: '2016-04-11T09:22:00Z',
__time: 1460366520000,
user: 'Alice',
followers: '3',
spend: '5.1',
followers: 3,
spend: 5.1,
id: '73534533',
tags: ['a', 'b'],
nums: ['7', '8'],
nums: [7, 8],
},
},
],
@ -119,9 +161,45 @@ SELECT
FROM test_data
*/
export const CSV_SAMPLE: SampleHeaderAndRows = {
header: ['timestamp', 'user', 'followers', 'spend', 'id', 'tags', 'nums'],
rows: [
export const CSV_SAMPLE: SampleResponse = {
numRowsRead: 3,
numRowsIndexed: 3,
logicalDimensions: [
{ type: 'string', name: 'user', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
{
type: 'string',
name: 'followers',
multiValueHandling: 'SORTED_ARRAY',
createBitmapIndex: true,
},
{ type: 'string', name: 'spend', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
{ type: 'string', name: 'id', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
{ type: 'json', name: 'tags', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
{ type: 'json', name: 'nums', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
],
physicalDimensions: [
{ type: 'auto', name: 'user', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
{
type: 'auto',
name: 'followers',
multiValueHandling: 'SORTED_ARRAY',
createBitmapIndex: true,
},
{ type: 'auto', name: 'spend', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
{ type: 'auto', name: 'id', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
{ type: 'auto', name: 'tags', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
{ type: 'auto', name: 'nums', multiValueHandling: 'SORTED_ARRAY', createBitmapIndex: true },
],
logicalSegmentSchema: [
{ name: '__time', type: 'LONG' },
{ name: 'user', type: 'STRING' },
{ name: 'followers', type: 'STRING' },
{ name: 'spend', type: 'STRING' },
{ name: 'id', type: 'STRING' },
{ name: 'tags', type: 'COMPLEX<json>' },
{ name: 'nums', type: 'COMPLEX<json>' },
],
data: [
{
input: {
timestamp: '2016-04-11T09:20:00.000Z',
@ -133,8 +211,7 @@ export const CSV_SAMPLE: SampleHeaderAndRows = {
nums: '4',
},
parsed: {
__time: 0,
timestamp: '2016-04-11T09:20:00.000Z',
__time: 1460366400000,
user: 'Alice',
followers: '10',
spend: '0',
@ -154,8 +231,7 @@ export const CSV_SAMPLE: SampleHeaderAndRows = {
nums: ['5', '6'],
},
parsed: {
__time: 0,
timestamp: '2016-04-11T09:21:00.000Z',
__time: 1460366460000,
user: 'Bob',
followers: '0',
spend: '3',
@ -175,8 +251,7 @@ export const CSV_SAMPLE: SampleHeaderAndRows = {
nums: ['7', '8'],
},
parsed: {
__time: 0,
timestamp: '2016-04-11T09:22:00.000Z',
__time: 1460366520000,
user: 'Alice',
followers: '3',
spend: '5.1',

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import type { SampleResponse } from './sampler';
import { guessDimensionsFromSampleResponse } from './sampler';
describe('sampler', () => {
describe('getInferredDimensionsFromSampleResponse', () => {
const sampleResponse: SampleResponse = {
numRowsRead: 20,
numRowsIndexed: 20,
logicalDimensions: [
{
type: 'long',
name: 'isRobot',
multiValueHandling: 'SORTED_ARRAY',
createBitmapIndex: false,
},
{
type: 'string',
name: 'channel',
multiValueHandling: 'SORTED_ARRAY',
createBitmapIndex: true,
},
{
type: 'string',
name: 'flags',
multiValueHandling: 'SORTED_ARRAY',
createBitmapIndex: true,
},
{
type: 'long',
name: 'isUnpatrolled',
multiValueHandling: 'SORTED_ARRAY',
createBitmapIndex: false,
},
],
physicalDimensions: [
{
type: 'json',
name: 'isRobot',
multiValueHandling: 'SORTED_ARRAY',
createBitmapIndex: true,
},
{
type: 'json',
name: 'channel',
multiValueHandling: 'SORTED_ARRAY',
createBitmapIndex: true,
},
{
type: 'json',
name: 'flags',
multiValueHandling: 'SORTED_ARRAY',
createBitmapIndex: true,
},
{
type: 'json',
name: 'isUnpatrolled',
multiValueHandling: 'SORTED_ARRAY',
createBitmapIndex: true,
},
],
logicalSegmentSchema: [
{ name: '__time', type: 'LONG' },
{ name: 'isRobot', type: 'LONG' },
{ name: 'channel', type: 'STRING' },
{ name: 'flags', type: 'STRING' },
{ name: 'isUnpatrolled', type: 'LONG' },
],
data: [
{
input: {
isRobot: true,
channel: '#sv.wikipedia',
timestamp: '2016-06-27T00:00:11.080Z',
flags: 'NB',
isUnpatrolled: false,
},
parsed: {
__time: 1466985611080,
isRobot: true,
channel: '#sv.wikipedia',
flags: 'NB',
isUnpatrolled: false,
},
},
],
};
it('works', () => {
expect(guessDimensionsFromSampleResponse(sampleResponse)).toMatchInlineSnapshot(`
Array [
Object {
"name": "isRobot",
"type": "string",
},
Object {
"createBitmapIndex": true,
"multiValueHandling": "SORTED_ARRAY",
"name": "channel",
"type": "string",
},
Object {
"createBitmapIndex": true,
"multiValueHandling": "SORTED_ARRAY",
"name": "flags",
"type": "string",
},
Object {
"name": "isUnpatrolled",
"type": "string",
},
]
`);
});
});
});

View File

@ -20,6 +20,7 @@ import { dedupe } from 'druid-query-toolkit';
import * as JSONBig from 'json-bigint-native';
import type {
DimensionSpec,
DimensionsSpec,
IngestionSpec,
IngestionType,
@ -32,21 +33,20 @@ import type {
} from '../druid-models';
import {
getDimensionNamesFromTransforms,
getDimensionSpecName,
getSpecType,
getTimestampSchema,
isDruidSource,
PLACEHOLDER_TIMESTAMP_SPEC,
REINDEX_TIMESTAMP_SPEC,
TIME_COLUMN,
upgradeSpec,
} from '../druid-models';
import { Api } from '../singletons';
import { getDruidErrorMessage, queryDruidRune } from './druid-query';
import { arrangeWithPrefixSuffix, EMPTY_ARRAY, filterMap } from './general';
import { EMPTY_ARRAY, filterMap } from './general';
import { deepGet, deepSet } from './object-change';
const SAMPLER_URL = `/druid/indexer/v1/sampler`;
const BASE_SAMPLER_CONFIG: SamplerConfig = {
numRows: 500,
timeoutMs: 15000,
@ -63,6 +63,38 @@ export interface SamplerConfig {
export interface SampleResponse {
data: SampleEntry[];
logicalSegmentSchema: { name: string; type: string }[];
logicalDimensions: DimensionSpec[];
physicalDimensions: DimensionSpec[];
numRowsIndexed: number;
numRowsRead: number;
}
export function getHeaderNamesFromSampleResponse(
sampleResponse: SampleResponse,
ignoreTimeColumn = false,
) {
return filterMap(sampleResponse.logicalSegmentSchema, s =>
ignoreTimeColumn && s.name === '__time' ? undefined : s.name,
);
}
export function guessDimensionsFromSampleResponse(sampleResponse: SampleResponse): DimensionSpec[] {
const { logicalDimensions, physicalDimensions, data } = sampleResponse;
return logicalDimensions.map(d => {
// Boolean column are currently reported as "long" so let's turn them into "string"
if (
d.type === 'long' &&
physicalDimensions.find(_ => _.name === d.name)?.type === 'json' &&
typeof data[0]?.input?.[d.name] === 'boolean'
) {
return {
name: d.name,
type: 'string',
};
}
return d;
});
}
export type CacheRows = Record<string, any>[];
@ -81,17 +113,6 @@ export interface SampleEntry {
error?: string;
}
export interface SampleHeaderAndRows {
header: string[];
rows: SampleEntry[];
}
export interface ExampleManifest {
name: string;
description: string;
spec: any;
}
export function getCacheRowsFromSampleResponse(sampleResponse: SampleResponse): CacheRows {
return filterMap(sampleResponse.data, d => d.input).slice(0, 20);
}
@ -126,46 +147,6 @@ export function applyCache(sampleSpec: SampleSpec, cacheRows: CacheRows) {
return sampleSpec;
}
export interface HeaderFromSampleResponseOptions {
sampleResponse: SampleResponse;
ignoreTimeColumn?: boolean;
columnOrder?: string[];
suffixColumnOrder?: string[];
useInput?: boolean;
}
export function headerFromSampleResponse(options: HeaderFromSampleResponseOptions): string[] {
const { sampleResponse, ignoreTimeColumn, columnOrder, suffixColumnOrder, useInput } = options;
const key = useInput ? 'input' : 'parsed';
let columns = arrangeWithPrefixSuffix(
dedupe(sampleResponse.data.flatMap(s => (s[key] ? Object.keys(s[key]!) : []))),
columnOrder || [TIME_COLUMN],
suffixColumnOrder || [],
);
if (ignoreTimeColumn) {
columns = columns.filter(c => c !== TIME_COLUMN);
}
return columns;
}
export interface HeaderAndRowsFromSampleResponseOptions extends HeaderFromSampleResponseOptions {
parsedOnly?: boolean;
}
export function headerAndRowsFromSampleResponse(
options: HeaderAndRowsFromSampleResponseOptions,
): SampleHeaderAndRows {
const { sampleResponse, parsedOnly } = options;
return {
header: headerFromSampleResponse(options),
rows: parsedOnly ? sampleResponse.data.filter(d => d.parsed) : sampleResponse.data,
};
}
export async function getProxyOverlordModules(): Promise<string[]> {
let statusResp: any;
try {
@ -185,7 +166,7 @@ export async function postToSampler(
let sampleResp: any;
try {
sampleResp = await Api.instance.post(`${SAMPLER_URL}?for=${forStr}`, sampleSpec);
sampleResp = await Api.instance.post(`/druid/indexer/v1/sampler?for=${forStr}`, sampleSpec);
} catch (e) {
throw new Error(getDruidErrorMessage(e));
}
@ -229,6 +210,23 @@ function fixSamplerTypes(sampleSpec: SampleSpec): SampleSpec {
return sampleSpec;
}
const WHOLE_ROW_INPUT_FORMAT: InputFormat = {
type: 'regex',
pattern: '([\\s\\S]*)', // Match the entire line, every single character
listDelimiter: '56616469-6de2-9da4-efb8-8f416e6e6965', // Just a UUID to disable the list delimiter, let's hope we do not see this UUID in the data
columns: ['raw'],
};
const KAFKA_SAMPLE_INPUT_FORMAT: InputFormat = {
type: 'kafka',
headerFormat: {
type: 'string',
encoding: 'UTF-8',
},
keyFormat: WHOLE_ROW_INPUT_FORMAT,
valueFormat: WHOLE_ROW_INPUT_FORMAT,
};
export async function sampleForConnect(
spec: Partial<IngestionSpec>,
sampleStrategy: SampleStrategy,
@ -242,12 +240,11 @@ export async function sampleForConnect(
const reingestMode = isDruidSource(spec);
if (!reingestMode) {
ioConfig = deepSet(ioConfig, 'inputFormat', {
type: 'regex',
pattern: '([\\s\\S]*)', // Match the entire line, every single character
listDelimiter: '56616469-6de2-9da4-efb8-8f416e6e6965', // Just a UUID to disable the list delimiter, let's hope we do not see this UUID in the data
columns: ['raw'],
});
ioConfig = deepSet(
ioConfig,
'inputFormat',
samplerType === 'kafka' ? KAFKA_SAMPLE_INPUT_FORMAT : WHOLE_ROW_INPUT_FORMAT,
);
}
const sampleSpec: SampleSpec = {
@ -332,7 +329,9 @@ export async function sampleForParser(
dataSchema: {
dataSource: 'sample',
timestampSpec: reingestMode ? REINDEX_TIMESTAMP_SPEC : PLACEHOLDER_TIMESTAMP_SPEC,
dimensionsSpec: {},
dimensionsSpec: {
useSchemaDiscovery: true,
},
granularitySpec: {
rollup: false,
},
@ -359,7 +358,9 @@ export async function sampleForTimestamp(
ioConfig: deepGet(spec, 'spec.ioConfig'),
dataSchema: {
dataSource: 'sample',
dimensionsSpec: {},
dimensionsSpec: {
useSchemaDiscovery: true,
},
timestampSpec: timestampSchema === 'column' ? PLACEHOLDER_TIMESTAMP_SPEC : timestampSpec,
granularitySpec: {
rollup: false,
@ -380,7 +381,7 @@ export async function sampleForTimestamp(
const transforms: Transform[] =
deepGet(spec, 'spec.dataSchema.transformSpec.transforms') || EMPTY_ARRAY;
// If we are trying to parts a column then get a bit fancy:
// If we are trying to parse a column then get a bit fancy:
// Query the same sample again (same cache key)
const sampleSpec: SampleSpec = {
type: samplerType,
@ -388,7 +389,9 @@ export async function sampleForTimestamp(
ioConfig: deepGet(spec, 'spec.ioConfig'),
dataSchema: {
dataSource: 'sample',
dimensionsSpec: {},
dimensionsSpec: {
useSchemaDiscovery: true,
},
timestampSpec,
transformSpec: {
transforms: transforms.filter(transform => transform.name === TIME_COLUMN),
@ -430,8 +433,8 @@ export async function sampleForTransform(
const timestampSpec: TimestampSpec = deepGet(spec, 'spec.dataSchema.timestampSpec');
const transforms: Transform[] = deepGet(spec, 'spec.dataSchema.transformSpec.transforms') || [];
// Extra step to simulate auto detecting dimension with transforms
let specialDimensionSpec: DimensionsSpec = {};
// Extra step to simulate auto-detecting dimension with transforms
let specialDimensionSpec: DimensionsSpec = { useSchemaDiscovery: true };
if (transforms && transforms.length) {
const sampleSpecHack: SampleSpec = {
type: samplerType,
@ -440,7 +443,9 @@ export async function sampleForTransform(
dataSchema: {
dataSource: 'sample',
timestampSpec,
dimensionsSpec: {},
dimensionsSpec: {
useSchemaDiscovery: true,
},
granularitySpec: {
rollup: false,
},
@ -458,10 +463,10 @@ export async function sampleForTransform(
specialDimensionSpec,
'dimensions',
dedupe(
headerFromSampleResponse({
sampleResponse: sampleResponseHack,
ignoreTimeColumn: true,
}).concat(getDimensionNamesFromTransforms(transforms)),
(
guessDimensionsFromSampleResponse(sampleResponseHack) as (DimensionSpec | string)[]
).concat(getDimensionNamesFromTransforms(transforms)),
getDimensionSpecName,
),
);
}
@ -497,8 +502,8 @@ export async function sampleForFilter(
const transforms: Transform[] = deepGet(spec, 'spec.dataSchema.transformSpec.transforms') || [];
const filter: any = deepGet(spec, 'spec.dataSchema.transformSpec.filter');
// Extra step to simulate auto detecting dimension with transforms
let specialDimensionSpec: DimensionsSpec = {};
// Extra step to simulate auto-detecting dimension with transforms
let specialDimensionSpec: DimensionsSpec = { useSchemaDiscovery: true };
if (transforms && transforms.length) {
const sampleSpecHack: SampleSpec = {
type: samplerType,
@ -507,7 +512,9 @@ export async function sampleForFilter(
dataSchema: {
dataSource: 'sample',
timestampSpec,
dimensionsSpec: {},
dimensionsSpec: {
useSchemaDiscovery: true,
},
granularitySpec: {
rollup: false,
},
@ -525,10 +532,9 @@ export async function sampleForFilter(
specialDimensionSpec,
'dimensions',
dedupe(
headerFromSampleResponse({
sampleResponse: sampleResponseHack,
ignoreTimeColumn: true,
}).concat(getDimensionNamesFromTransforms(transforms)),
getHeaderNamesFromSampleResponse(sampleResponseHack, true).concat(
getDimensionNamesFromTransforms(transforms),
),
),
);
}
@ -591,55 +597,3 @@ export async function sampleForSchema(
return postToSampler(applyCache(sampleSpec, cacheRows), 'schema');
}
export async function sampleForExampleManifests(
exampleManifestUrl: string,
): Promise<ExampleManifest[]> {
const exampleSpec: SampleSpec = {
type: 'index_parallel',
spec: {
ioConfig: {
type: 'index_parallel',
inputSource: { type: 'http', uris: [exampleManifestUrl] },
inputFormat: { type: 'tsv', findColumnsFromHeader: true },
},
dataSchema: {
dataSource: 'sample',
timestampSpec: {
column: 'timestamp',
missingValue: '2010-01-01T00:00:00Z',
},
dimensionsSpec: {},
},
},
samplerConfig: { numRows: 50, timeoutMs: 10000 },
};
const exampleData = await postToSampler(exampleSpec, 'example-manifest');
return filterMap(exampleData.data, datum => {
const parsed = datum.parsed;
if (!parsed) return;
let { name, description, spec } = parsed;
try {
spec = JSON.parse(spec);
} catch {
return;
}
if (
typeof name === 'string' &&
typeof description === 'string' &&
spec &&
typeof spec === 'object'
) {
return {
name: parsed.name,
description: parsed.description,
spec: upgradeSpec(spec),
};
} else {
return;
}
});
}

View File

@ -18,7 +18,7 @@
import type { IngestionSpec } from '../druid-models';
import { applyCache, headerFromSampleResponse } from './sampler';
import { applyCache } from './sampler';
describe('utils', () => {
const ingestionSpec: IngestionSpec = {
@ -52,20 +52,6 @@ describe('utils', () => {
},
};
// const cacheRows: CacheRows = [{ make: 'Honda', model: 'Civic' }, { make: 'BMW', model: 'M3' }];
it('spec-utils headerFromSampleResponse', () => {
expect(
headerFromSampleResponse({
sampleResponse: { data: [{ input: { a: 1 }, parsed: { a: 1 } }] },
}),
).toMatchInlineSnapshot(`
Array [
"a",
]
`);
});
it('spec-utils applyCache', () => {
expect(
applyCache(

View File

@ -25,9 +25,9 @@ describe('ExamplePicker', () => {
it('matches snapshot', () => {
const examplePicker = (
<ExamplePicker
exampleManifests={[
{ name: 'Wikipedia', description: 'stuff stuff', spec: {} },
{ name: 'Ex 2', description: 'stuff stuff', spec: {} },
exampleSpecs={[
{ name: 'Wikipedia', description: 'stuff stuff', spec: {} as any },
{ name: 'Ex 2', description: 'stuff stuff', spec: {} as any },
]}
onSelectExample={() => {}}
/>

View File

@ -20,15 +20,15 @@ import { Button, Callout, FormGroup, HTMLSelect, Intent } from '@blueprintjs/cor
import { IconNames } from '@blueprintjs/icons';
import React, { useState } from 'react';
import type { ExampleManifest } from '../../../utils/sampler';
import type { ExampleSpec } from '../example-specs';
export interface ExamplePickerProps {
exampleManifests: ExampleManifest[];
onSelectExample: (exampleManifest: ExampleManifest) => void;
exampleSpecs: ExampleSpec[];
onSelectExample(exampleSpec: ExampleSpec): void;
}
export const ExamplePicker = React.memo(function ExamplePicker(props: ExamplePickerProps) {
const { exampleManifests, onSelectExample } = props;
const { exampleSpecs, onSelectExample } = props;
const [selectedIndex, setSelectedIndex] = useState(0);
return (
@ -39,15 +39,15 @@ export const ExamplePicker = React.memo(function ExamplePicker(props: ExamplePic
value={selectedIndex}
onChange={e => setSelectedIndex(e.target.value as any)}
>
{exampleManifests.map((exampleManifest, i) => (
{exampleSpecs.map((exampleSpec, i) => (
<option key={i} value={i}>
{exampleManifest.name}
{exampleSpec.name}
</option>
))}
</HTMLSelect>
</FormGroup>
<FormGroup>
<Callout>{exampleManifests[selectedIndex].description}</Callout>
<Callout>{exampleSpecs[selectedIndex].description}</Callout>
</FormGroup>
<FormGroup>
<Button
@ -55,7 +55,7 @@ export const ExamplePicker = React.memo(function ExamplePicker(props: ExamplePic
rightIcon={IconNames.ARROW_RIGHT}
intent={Intent.PRIMARY}
onClick={() => {
onSelectExample(exampleManifests[selectedIndex]);
onSelectExample(exampleSpecs[selectedIndex]);
}}
/>
</FormGroup>

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export interface ExampleSpec {
name: string;
description: string;
spec: any;
}
export const EXAMPLE_SPECS: ExampleSpec[] = [
{
name: 'Wikipedia Edits',
description: 'Edits on Wikipedia from one day',
spec: {
type: 'index_parallel',
spec: {
ioConfig: {
type: 'index_parallel',
inputSource: {
type: 'http',
uris: ['https://druid.apache.org/data/wikipedia.json.gz'],
},
},
dataSchema: {
granularitySpec: {
segmentGranularity: 'day',
},
},
tuningConfig: {
type: 'index_parallel',
},
},
},
},
{
name: 'KoalasToTheMax one day',
description: 'One day of flat events from KoalasToTheMax.com (JSON)',
spec: {
type: 'index_parallel',
spec: {
ioConfig: {
type: 'index_parallel',
inputSource: {
type: 'http',
uris: ['https://static.imply.io/example-data/kttm-v2/kttm-v2-2019-08-25.json.gz'],
},
},
dataSchema: {
dataSource: 'kttm1',
granularitySpec: {
segmentGranularity: 'day',
},
},
tuningConfig: {
type: 'index_parallel',
},
},
},
},
{
name: 'KoalasToTheMax one day (nested)',
description: 'One day of nested events from KoalasToTheMax.com (JSON)',
spec: {
type: 'index_parallel',
spec: {
ioConfig: {
type: 'index_parallel',
inputSource: {
type: 'http',
uris: [
'https://static.imply.io/example-data/kttm-nested-v2/kttm-nested-v2-2019-08-25.json.gz',
],
},
},
dataSchema: {
dataSource: 'kttm_nested1',
granularitySpec: {
segmentGranularity: 'day',
},
},
tuningConfig: {
type: 'index_parallel',
},
},
},
},
];

View File

@ -19,23 +19,15 @@
import { render } from '@testing-library/react';
import React from 'react';
import { JSON_SAMPLE } from '../../../utils/sampler.mock';
import { FilterTable } from './filter-table';
describe('FilterTable', () => {
it('matches snapshot', () => {
const sampleData = {
header: ['c1'],
rows: [
{
input: { c1: 'hello' },
parsed: { c1: 'hello' },
},
],
};
const filterTable = (
<FilterTable
sampleData={sampleData}
sampleResponse={JSON_SAMPLE}
columnFilter=""
dimensionFilters={[]}
selectedFilterName={undefined}

View File

@ -30,22 +30,23 @@ import {
STANDARD_TABLE_PAGE_SIZE_OPTIONS,
} from '../../../react-table';
import { caseInsensitiveContains, filterMap } from '../../../utils';
import type { SampleEntry, SampleHeaderAndRows } from '../../../utils/sampler';
import type { SampleEntry, SampleResponse } from '../../../utils/sampler';
import { getHeaderNamesFromSampleResponse } from '../../../utils/sampler';
import './filter-table.scss';
export function filterTableSelectedColumnName(
sampleData: SampleHeaderAndRows,
sampleResponse: SampleResponse,
selectedFilter: Partial<DruidFilter> | undefined,
): string | undefined {
if (!selectedFilter) return;
const selectedFilterName = selectedFilter.dimension;
if (!sampleData.header.includes(selectedFilterName)) return;
if (!getHeaderNamesFromSampleResponse(sampleResponse).includes(selectedFilterName)) return;
return selectedFilterName;
}
export interface FilterTableProps {
sampleData: SampleHeaderAndRows;
sampleResponse: SampleResponse;
columnFilter: string;
dimensionFilters: DruidFilter[];
selectedFilterName: string | undefined;
@ -53,17 +54,18 @@ export interface FilterTableProps {
}
export const FilterTable = React.memo(function FilterTable(props: FilterTableProps) {
const { sampleData, columnFilter, dimensionFilters, selectedFilterName, onFilterSelect } = props;
const { sampleResponse, columnFilter, dimensionFilters, selectedFilterName, onFilterSelect } =
props;
return (
<ReactTable
className={classNames('filter-table', DEFAULT_TABLE_CLASS_NAME)}
data={sampleData.rows}
data={sampleResponse.data}
sortable={false}
defaultPageSize={STANDARD_TABLE_PAGE_SIZE}
pageSizeOptions={STANDARD_TABLE_PAGE_SIZE_OPTIONS}
showPagination={sampleData.rows.length > STANDARD_TABLE_PAGE_SIZE}
columns={filterMap(sampleData.header, (columnName, i) => {
showPagination={sampleResponse.data.length > STANDARD_TABLE_PAGE_SIZE}
columns={filterMap(getHeaderNamesFromSampleResponse(sampleResponse), (columnName, i) => {
if (!caseInsensitiveContains(columnName, columnFilter)) return;
const timestamp = columnName === '__time';
const filterIndex = dimensionFilters.findIndex(f => getFilterDimension(f) === columnName);

View File

@ -57,13 +57,7 @@ export const ConnectMessage = React.memo(function ConnectMessage(props: ConnectM
);
});
export interface ParserMessageProps {
canHaveNestedData: boolean;
}
export const ParserMessage = React.memo(function ParserMessage(props: ParserMessageProps) {
const { canHaveNestedData } = props;
export const ParserMessage = React.memo(function ParserMessage() {
return (
<FormGroup>
<Callout>
@ -71,24 +65,11 @@ export const ParserMessage = React.memo(function ParserMessage(props: ParserMess
Druid needs to parse data as columns. Determine the format of your data and ensure that
the columns are accurately parsed.
</p>
{canHaveNestedData && (
<>
<p>
If you have nested data, you can ingest it into{' '}
<ExternalLink href={`${getLink('DOCS')}/querying/nested-columns.html`}>
COMPLEX&lt;json&gt;
</ExternalLink>{' '}
columns.
If you have nested data, you can ingest it as{' '}
<ExternalLink href={`${getLink('DOCS')}/querying/nested-columns.html`}>json</ExternalLink>{' '}
dimensions.
</p>
<p>
Alternatively, you can explicitly{' '}
<ExternalLink href={`${getLink('DOCS')}/ingestion/index.html#flattenspec`}>
flatten
</ExternalLink>{' '}
it here.
</p>
</>
)}
<LearnMore href={`${getLink('DOCS')}/ingestion/data-formats.html`} />
</Callout>
</FormGroup>

View File

@ -316,4 +316,8 @@ $actual-icon-height: 400px;
width: 100%;
height: 100% !important;
}
.parse-metadata {
border-top: 1px solid $gray1;
}
}

View File

@ -73,6 +73,7 @@ import {
addTimestampTransform,
adjustForceGuaranteedRollup,
adjustId,
BATCH_INPUT_FORMAT_FIELDS,
cleanSpec,
computeFlattenPathsForData,
CONSTANT_TIMESTAMP_SPEC,
@ -95,10 +96,10 @@ import {
getRequiredModule,
getRollup,
getSecondaryPartitionRelatedFormFields,
getSpecType,
getTimestampExpressionFields,
getTimestampSchema,
getTuningFormFields,
INPUT_FORMAT_FIELDS,
inputFormatCanProduceNestedData,
invalidIoConfig,
invalidPartitionConfig,
@ -108,6 +109,7 @@ import {
issueWithIoConfig,
issueWithSampleData,
joinFilter,
KAFKA_METADATA_INPUT_FORMAT_FIELDS,
KNOWN_FILTER_TYPES,
MAX_INLINE_DATA_LENGTH,
METRIC_SPEC_FIELDS,
@ -129,8 +131,10 @@ import { getLink } from '../../links';
import { Api, AppToaster, UrlBaser } from '../../singletons';
import {
alphanumericCompare,
compact,
deepDelete,
deepGet,
deepMove,
deepSet,
deepSetMulti,
EMPTY_ARRAY,
@ -147,19 +151,17 @@ import {
} from '../../utils';
import type {
CacheRows,
ExampleManifest,
SampleEntry,
SampleHeaderAndRows,
SampleResponse,
SampleResponseWithExtraInfo,
SampleStrategy,
} from '../../utils/sampler';
import {
getCacheRowsFromSampleResponse,
getHeaderNamesFromSampleResponse,
getProxyOverlordModules,
headerAndRowsFromSampleResponse,
guessDimensionsFromSampleResponse,
sampleForConnect,
sampleForExampleManifests,
sampleForFilter,
sampleForParser,
sampleForSchema,
@ -168,6 +170,7 @@ import {
} from '../../utils/sampler';
import { ExamplePicker } from './example-picker/example-picker';
import { EXAMPLE_SPECS } from './example-specs';
import { FilterTable, filterTableSelectedColumnName } from './filter-table/filter-table';
import { FormEditor } from './form-editor/form-editor';
import {
@ -213,46 +216,74 @@ function showRawLine(line: SampleEntry): string {
}
function showDruidLine(line: SampleEntry): string {
if (!line.input) return 'Invalid row';
return `Druid row: ${JSONBig.stringify(line.input)}`;
if (!line.input) return 'Invalid druid row';
return `[Druid row: ${JSONBig.stringify(line.input)}]`;
}
function showKafkaLine(line: SampleEntry): string {
const { input } = line;
if (!input) return 'Invalid kafka row';
return compact([
`[ Kafka timestamp: ${input['kafka.timestamp']}`,
...filterMap(Object.entries(input), ([k, v]) => {
if (!k.startsWith('kafka.header.')) return;
return ` Header: ${k.slice(13)}=${v}`;
}),
input['kafka.key'] ? ` Key: ${input['kafka.key']}` : undefined,
` Payload: ${input.raw}`,
']',
]).join('\n');
}
function showBlankLine(line: SampleEntry): string {
return line.parsed ? `[Row: ${JSONBig.stringify(line.parsed)}]` : '[Binary data]';
}
function formatSampleEntries(sampleEntries: SampleEntry[], isDruidSource: boolean): string {
if (sampleEntries.length) {
if (isDruidSource) {
function formatSampleEntries(
sampleEntries: SampleEntry[],
druidSource: boolean,
kafkaSource: boolean,
): string {
if (!sampleEntries.length) return 'No data returned from sampler';
if (druidSource) {
return sampleEntries.map(showDruidLine).join('\n');
}
if (kafkaSource) {
return sampleEntries.map(showKafkaLine).join('\n');
}
return (
sampleEntries.every(l => !l.parsed)
? sampleEntries.map(showBlankLine)
: sampleEntries.map(showRawLine)
).join('\n');
} else {
return 'No data returned from sampler';
}
}
function getTimestampSpec(headerAndRows: SampleHeaderAndRows | null): TimestampSpec {
if (!headerAndRows) return CONSTANT_TIMESTAMP_SPEC;
function getTimestampSpec(sampleResponse: SampleResponse | null): TimestampSpec {
if (!sampleResponse) return CONSTANT_TIMESTAMP_SPEC;
const timestampSpecs = filterMap(headerAndRows.header, sampleHeader => {
const timestampSpecs = filterMap(
getHeaderNamesFromSampleResponse(sampleResponse),
sampleHeader => {
const possibleFormat = possibleDruidFormatForValues(
filterMap(headerAndRows.rows, d => (d.parsed ? d.parsed[sampleHeader] : undefined)),
filterMap(sampleResponse.data, d => (d.parsed ? d.parsed[sampleHeader] : undefined)),
);
if (!possibleFormat) return;
return {
column: sampleHeader,
format: possibleFormat,
};
});
},
);
return (
timestampSpecs.find(ts => /time/i.test(ts.column)) || // Use a suggestion that has time in the name if possible
// Prefer a suggestion that has "time" in the name and is not a numeric format
timestampSpecs.find(
ts => /time/i.test(ts.column) && !NUMERIC_TIME_FORMATS.includes(ts.format),
) ||
timestampSpecs.find(ts => /time/i.test(ts.column)) || // Otherwise anything that has "time" in the name
timestampSpecs.find(ts => !NUMERIC_TIME_FORMATS.includes(ts.format)) || // Use a suggestion that is not numeric
timestampSpecs[0] || // Fall back to the first one
CONSTANT_TIMESTAMP_SPEC // Ok, empty it is...
@ -319,7 +350,6 @@ export interface LoadDataViewProps {
mode: LoadDataViewMode;
initSupervisorId?: string;
initTaskId?: string;
exampleManifestsUrl?: string;
goToIngestion: (taskGroupId: string | undefined, openDialog?: string) => void;
}
@ -342,7 +372,6 @@ export interface LoadDataViewState {
// welcome
overlordModules?: string[];
selectedComboType?: IngestionComboTypeWithExtra;
exampleManifests?: ExampleManifest[];
// general
sampleStrategy: SampleStrategy;
@ -354,30 +383,31 @@ export interface LoadDataViewState {
inputQueryState: QueryState<SampleResponseWithExtraInfo>;
// for parser
parserQueryState: QueryState<SampleHeaderAndRows>;
parserQueryState: QueryState<SampleResponse>;
// for flatten
selectedFlattenField?: SelectedIndex<FlattenField>;
// for timestamp
timestampQueryState: QueryState<{
headerAndRows: SampleHeaderAndRows;
sampleResponse: SampleResponse;
spec: Partial<IngestionSpec>;
}>;
// for transform
transformQueryState: QueryState<SampleHeaderAndRows>;
transformQueryState: QueryState<SampleResponse>;
selectedTransform?: SelectedIndex<Transform>;
// for filter
filterQueryState: QueryState<SampleHeaderAndRows>;
filterQueryState: QueryState<SampleResponse>;
selectedFilter?: SelectedIndex<DruidFilter>;
// for schema
schemaQueryState: QueryState<{
headerAndRows: SampleHeaderAndRows;
sampleResponse: SampleResponse;
dimensions: (string | DimensionSpec)[] | undefined;
metricsSpec: MetricSpec[] | undefined;
definedDimensions: boolean;
}>;
selectedAutoDimension?: string;
selectedDimensionSpec?: SelectedIndex<DimensionSpec>;
@ -613,9 +643,6 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
const { step } = this.state;
switch (step) {
case 'welcome':
return this.queryForWelcome();
case 'connect':
return this.queryForConnect(initRun);
@ -635,6 +662,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
return this.queryForSchema(initRun);
case 'loading':
case 'welcome':
case 'partition':
case 'publish':
case 'tuning':
@ -774,25 +802,6 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
// ==================================================================
async queryForWelcome() {
const { exampleManifestsUrl } = this.props;
if (!exampleManifestsUrl) return;
let exampleManifests: ExampleManifest[] | undefined;
try {
exampleManifests = await sampleForExampleManifests(exampleManifestsUrl);
} catch (e) {
this.setState({
exampleManifests: undefined,
});
return;
}
this.setState({
exampleManifests,
});
}
renderIngestionCard(
comboType: IngestionComboTypeWithExtra,
disabled?: boolean,
@ -831,9 +840,8 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
}
renderWelcomeStep() {
const { mode, exampleManifestsUrl } = this.props;
const { spec, exampleManifests } = this.state;
const noExamples = Boolean(!exampleManifests || !exampleManifests.length);
const { mode } = this.props;
const { spec } = this.state;
const welcomeMessage = this.renderWelcomeStepMessage();
return (
@ -857,7 +865,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
{this.renderIngestionCard('index_parallel:http')}
{this.renderIngestionCard('index_parallel:local')}
{this.renderIngestionCard('index_parallel:inline')}
{exampleManifestsUrl && this.renderIngestionCard('example', noExamples)}
{this.renderIngestionCard('example')}
</>
)}
{this.renderIngestionCard('other')}
@ -879,7 +887,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
}
renderWelcomeStepMessage(): JSX.Element | undefined {
const { selectedComboType, exampleManifests } = this.state;
const { selectedComboType } = this.state;
if (!selectedComboType) {
return <p>Please specify where your raw data is located.</p>;
@ -969,11 +977,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
);
case 'example':
if (exampleManifests && exampleManifests.length) {
return; // Yield to example picker controls
} else {
return <p>Could not load examples.</p>;
}
case 'other':
return (
@ -993,7 +997,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
renderWelcomeStepControls(): JSX.Element | undefined {
const { goToIngestion } = this.props;
const { spec, selectedComboType, exampleManifests } = this.state;
const { spec, selectedComboType } = this.state;
const issue = this.selectedIngestionTypeIssue();
if (issue) return;
@ -1063,12 +1067,11 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
);
case 'example':
if (!exampleManifests) return;
return (
<ExamplePicker
exampleManifests={exampleManifests}
onSelectExample={exampleManifest => {
this.updateSpec(exampleManifest.spec);
exampleSpecs={EXAMPLE_SPECS}
onSelectExample={exampleSpec => {
this.updateSpec(exampleSpec.spec);
this.updateStep('connect');
}}
/>
@ -1211,6 +1214,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
const ioConfig: IoConfig = deepGet(spec, 'spec.ioConfig') || EMPTY_OBJECT;
const inlineMode = deepGet(spec, 'spec.ioConfig.inputSource.type') === 'inline';
const druidSource = isDruidSource(spec);
const kafkaSource = getSpecType(spec) === 'kafka';
let mainFill: JSX.Element | string;
if (inlineMode) {
@ -1242,7 +1246,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
<TextArea
className="raw-lines"
readOnly
value={formatSampleEntries(inputData, druidSource)}
value={formatSampleEntries(inputData, druidSource, kafkaSource)}
/>
)}
{inputQueryState.isLoading() && <Loader />}
@ -1364,11 +1368,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
this.updateSpec(fillDataSourceNameIfNeeded(newSpec));
} else {
const sampleLines = filterMap(inputQueryState.data.data, l =>
l.input ? l.input.raw : undefined,
);
const issue = issueWithSampleData(sampleLines, spec);
const issue = issueWithSampleData(inputData, spec);
if (issue) {
AppToaster.show({
icon: IconNames.WARNING_SIGN,
@ -1379,9 +1379,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
return false;
}
this.updateSpec(
fillDataSourceNameIfNeeded(fillInputFormatIfNeeded(spec, sampleLines)),
);
this.updateSpec(fillDataSourceNameIfNeeded(fillInputFormatIfNeeded(spec, inputData)));
}
return true;
},
@ -1427,10 +1425,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
this.setState(({ parserQueryState }) => ({
cacheRows: getCacheRowsFromSampleResponse(sampleResponse),
parserQueryState: new QueryState({
data: headerAndRowsFromSampleResponse({
sampleResponse,
ignoreTimeColumn: true,
}),
data: sampleResponse,
lastData: parserQueryState.getSomeData(),
}),
}));
@ -1473,7 +1468,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
</div>
{data && (
<ParseDataTable
sampleData={data}
sampleResponse={data}
columnFilter={columnFilter}
canFlatten={canHaveNestedData}
flattenedColumnsOnly={specialColumnsOnly}
@ -1492,31 +1487,85 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
let suggestedFlattenFields: FlattenField[] | undefined;
if (canHaveNestedData && !flattenFields.length && parserQueryState.data) {
suggestedFlattenFields = computeFlattenPathsForData(
filterMap(parserQueryState.data.rows, r => r.input),
filterMap(parserQueryState.data.data, r => r.input),
'ignore-arrays',
);
}
const specType = getSpecType(spec);
const inputFormatFields = isStreamingSpec(spec)
? STREAMING_INPUT_FORMAT_FIELDS
: INPUT_FORMAT_FIELDS;
: BATCH_INPUT_FORMAT_FIELDS;
const normalInputAutoForm = (
<AutoForm
fields={inputFormatFields}
model={inputFormat}
onChange={p => this.updateSpecPreview(deepSet(spec, 'spec.ioConfig.inputFormat', p))}
/>
);
return (
<>
<div className="main">{mainFill}</div>
<div className="control">
<ParserMessage canHaveNestedData={canHaveNestedData} />
<ParserMessage />
{!selectedFlattenField && (
<>
{specType !== 'kafka' ? (
normalInputAutoForm
) : (
<>
{inputFormat?.type !== 'kafka' ? (
normalInputAutoForm
) : (
<AutoForm
fields={inputFormatFields}
model={inputFormat?.valueFormat}
onChange={p =>
this.updateSpecPreview(
deepSet(spec, 'spec.ioConfig.inputFormat.valueFormat', p),
)
}
/>
)}
<FormGroup className="parse-metadata">
<Switch
label="Parse Kafka metadata (ts, headers, key)"
checked={inputFormat?.type === 'kafka'}
onChange={() => {
this.updateSpecPreview(
inputFormat?.type === 'kafka'
? deepMove(
spec,
'spec.ioConfig.inputFormat.valueFormat',
'spec.ioConfig.inputFormat',
)
: deepSet(spec, 'spec.ioConfig.inputFormat', {
type: 'kafka',
valueFormat: inputFormat,
}),
);
}}
/>
</FormGroup>
{inputFormat?.type === 'kafka' && (
<AutoForm
fields={KAFKA_METADATA_INPUT_FORMAT_FIELDS}
model={inputFormat}
onChange={p =>
this.updateSpecPreview(deepSet(spec, 'spec.ioConfig.inputFormat', p))
}
/>
)}
</>
)}
{this.renderApplyButtonBar(
parserQueryState,
AutoForm.issueWithModel(inputFormat, inputFormatFields),
AutoForm.issueWithModel(inputFormat, inputFormatFields) ||
(inputFormat?.type === 'kafka'
? AutoForm.issueWithModel(inputFormat, KAFKA_METADATA_INPUT_FORMAT_FIELDS)
: undefined),
)}
</>
)}
@ -1668,9 +1717,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
this.setState(({ timestampQueryState }) => ({
timestampQueryState: new QueryState({
data: {
headerAndRows: headerAndRowsFromSampleResponse({
sampleResponse,
}),
spec,
},
lastData: timestampQueryState.getSomeData(),
@ -1717,7 +1764,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
columnFilter={columnFilter}
possibleTimestampColumnsOnly={specialColumnsOnly}
selectedColumnName={parseTimeTableSelectedColumnName(
data.headerAndRows,
data.sampleResponse,
timestampSpec,
)}
onTimestampColumnSelect={this.onTimestampColumnSelect}
@ -1858,9 +1905,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
this.setState(({ transformQueryState }) => ({
transformQueryState: new QueryState({
data: headerAndRowsFromSampleResponse({
sampleResponse,
}),
data: sampleResponse,
lastData: transformQueryState.getSomeData(),
}),
}));
@ -1876,7 +1921,8 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
if (transformQueryState.isInit()) {
mainFill = <CenterMessage>Please fill in the previous steps</CenterMessage>;
} else {
const data = transformQueryState.getSomeData();
const sampleResponse = transformQueryState.getSomeData();
mainFill = (
<div className="table-with-control">
<div className="table-control">
@ -1892,13 +1938,16 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
disabled={!transforms.length}
/>
</div>
{data && (
{sampleResponse && (
<TransformTable
sampleData={data}
sampleResponse={sampleResponse}
columnFilter={columnFilter}
transformedColumnsOnly={specialColumnsOnly}
transforms={transforms}
selectedColumnName={transformTableSelectedColumnName(data, selectedTransform?.value)}
selectedColumnName={transformTableSelectedColumnName(
sampleResponse,
selectedTransform?.value,
)}
onTransformSelect={this.onTransformSelect}
/>
)}
@ -2045,10 +2094,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
if (sampleResponse.data.length) {
this.setState(({ filterQueryState }) => ({
filterQueryState: new QueryState({
data: headerAndRowsFromSampleResponse({
sampleResponse,
parsedOnly: true,
}),
data: sampleResponse,
lastData: filterQueryState.getSomeData(),
}),
}));
@ -2067,15 +2113,10 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
return;
}
const headerAndRowsNoFilter = headerAndRowsFromSampleResponse({
sampleResponse: sampleResponseNoFilter,
parsedOnly: true,
});
this.setState(({ filterQueryState }) => ({
// cacheRows: sampleResponseNoFilter.cacheKey,
filterQueryState: new QueryState({
data: deepSet(headerAndRowsNoFilter, 'rows', []),
data: sampleResponseNoFilter,
lastData: filterQueryState.getSomeData(),
}),
}));
@ -2095,7 +2136,8 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
if (filterQueryState.isInit()) {
mainFill = <CenterMessage>Please enter more details for the previous steps</CenterMessage>;
} else {
const data = filterQueryState.getSomeData();
const filterQuery = filterQueryState.getSomeData();
mainFill = (
<div className="table-with-control">
<div className="table-control">
@ -2105,12 +2147,12 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
placeholder="Search columns"
/>
</div>
{data && (
{filterQuery && (
<FilterTable
sampleData={data}
sampleResponse={filterQuery}
columnFilter={columnFilter}
dimensionFilters={dimensionFilters}
selectedFilterName={filterTableSelectedColumnName(data, selectedFilter?.value)}
selectedFilterName={filterTableSelectedColumnName(filterQuery, selectedFilter?.value)}
onFilterSelect={this.onFilterSelect}
/>
)}
@ -2243,15 +2285,10 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
this.setState(({ schemaQueryState }) => ({
schemaQueryState: new QueryState({
data: {
headerAndRows: headerAndRowsFromSampleResponse({
sampleResponse,
columnOrder: [TIME_COLUMN].concat(
dimensions ? dimensions.map(getDimensionSpecName) : [],
),
suffixColumnOrder: metricsSpec ? metricsSpec.map(getMetricSpecName) : undefined,
}),
dimensions,
dimensions: dimensions || guessDimensionsFromSampleResponse(sampleResponse),
metricsSpec,
definedDimensions: Boolean(dimensions),
},
lastData: schemaQueryState.getSomeData(),
}),
@ -2569,13 +2606,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
action={async () => {
const sampleResponse = await sampleForTransform(spec, cacheRows);
this.updateSpec(
updateSchemaWithSample(
spec,
headerAndRowsFromSampleResponse({ sampleResponse }),
getDimensionMode(spec),
newRollup,
true,
),
updateSchemaWithSample(spec, sampleResponse, getDimensionMode(spec), newRollup, true),
);
}}
confirmButtonText={`Yes - ${newRollup ? 'enable' : 'disable'} rollup`}
@ -2600,12 +2631,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
action={async () => {
const sampleResponse = await sampleForTransform(spec, cacheRows);
this.updateSpec(
updateSchemaWithSample(
spec,
headerAndRowsFromSampleResponse({ sampleResponse }),
newDimensionMode,
getRollup(spec),
),
updateSchemaWithSample(spec, sampleResponse, newDimensionMode, getRollup(spec)),
);
}}
confirmButtonText={`Yes - ${autoDetect ? 'auto detect' : 'explicitly set'} columns`}

View File

@ -19,23 +19,15 @@
import { render } from '@testing-library/react';
import React from 'react';
import { JSON_SAMPLE } from '../../../utils/sampler.mock';
import { ParseDataTable } from './parse-data-table';
describe('ParseDataTable', () => {
it('matches snapshot', () => {
const sampleData = {
header: ['c1'],
rows: [
{
input: { c1: 'hello' },
parsed: { c1: 'hello' },
},
],
};
const parseDataTable = (
<ParseDataTable
sampleData={sampleData}
sampleResponse={JSON_SAMPLE}
columnFilter=""
canFlatten={false}
flattenedColumnsOnly={false}

View File

@ -30,12 +30,13 @@ import {
STANDARD_TABLE_PAGE_SIZE_OPTIONS,
} from '../../../react-table';
import { caseInsensitiveContains, filterMap } from '../../../utils';
import type { SampleEntry, SampleHeaderAndRows } from '../../../utils/sampler';
import type { SampleEntry, SampleResponse } from '../../../utils/sampler';
import { getHeaderNamesFromSampleResponse } from '../../../utils/sampler';
import './parse-data-table.scss';
export interface ParseDataTableProps {
sampleData: SampleHeaderAndRows;
sampleResponse: SampleResponse;
columnFilter: string;
canFlatten: boolean;
flattenedColumnsOnly: boolean;
@ -46,7 +47,7 @@ export interface ParseDataTableProps {
export const ParseDataTable = React.memo(function ParseDataTable(props: ParseDataTableProps) {
const {
sampleData,
sampleResponse,
columnFilter,
canFlatten,
flattenedColumnsOnly,
@ -59,12 +60,14 @@ export const ParseDataTable = React.memo(function ParseDataTable(props: ParseDat
return (
<ReactTable
className={classNames('parse-data-table', DEFAULT_TABLE_CLASS_NAME)}
data={sampleData.rows}
data={sampleResponse.data}
sortable={false}
defaultPageSize={STANDARD_TABLE_PAGE_SIZE}
pageSizeOptions={STANDARD_TABLE_PAGE_SIZE_OPTIONS}
showPagination={sampleData.rows.length > STANDARD_TABLE_PAGE_SIZE}
columns={filterMap(sampleData.header, (columnName, i) => {
showPagination={sampleResponse.data.length > STANDARD_TABLE_PAGE_SIZE}
columns={filterMap(
getHeaderNamesFromSampleResponse(sampleResponse, true),
(columnName, i) => {
if (!caseInsensitiveContains(columnName, columnFilter)) return;
const flattenFieldIndex = flattenFields.findIndex(f => f.name === columnName);
if (flattenFieldIndex === -1 && flattenedColumnsOnly) return;
@ -97,7 +100,8 @@ export const ParseDataTable = React.memo(function ParseDataTable(props: ParseDat
flattened: flattenField,
}),
};
})}
},
)}
SubComponent={rowInfo => {
const { input, error } = rowInfo.original;
const inputStr = JSONBig.stringify(input, undefined, 2);

View File

@ -22,21 +22,12 @@ import React from 'react';
import type { IngestionSpec } from '../../../druid-models';
import { PLACEHOLDER_TIMESTAMP_SPEC } from '../../../druid-models';
import { deepSet } from '../../../utils';
import { JSON_SAMPLE } from '../../../utils/sampler.mock';
import { ParseTimeTable } from './parse-time-table';
describe('ParseTimeTable', () => {
it('matches snapshot', () => {
const sampleData = {
header: ['c1'],
rows: [
{
input: { c1: 'hello' },
parsed: { c1: 'hello' },
},
],
};
const spec = deepSet(
{} as IngestionSpec,
'spec.dataSchema.timestampSpec',
@ -46,7 +37,7 @@ describe('ParseTimeTable', () => {
const parseTimeTable = (
<ParseTimeTable
sampleBundle={{
headerAndRows: sampleData,
sampleResponse: JSON_SAMPLE,
spec,
}}
columnFilter=""

View File

@ -34,23 +34,29 @@ import {
STANDARD_TABLE_PAGE_SIZE_OPTIONS,
} from '../../../react-table';
import { caseInsensitiveContains, filterMap } from '../../../utils';
import type { SampleEntry, SampleHeaderAndRows } from '../../../utils/sampler';
import type { SampleEntry, SampleResponse } from '../../../utils/sampler';
import { getHeaderNamesFromSampleResponse } from '../../../utils/sampler';
import './parse-time-table.scss';
export function parseTimeTableSelectedColumnName(
sampleData: SampleHeaderAndRows,
sampleResponse: SampleResponse,
timestampSpec: TimestampSpec | undefined,
): string | undefined {
if (!timestampSpec) return;
const timestampColumn = timestampSpec.column;
if (!timestampColumn || !sampleData.header.includes(timestampColumn)) return;
if (
!timestampColumn ||
!getHeaderNamesFromSampleResponse(sampleResponse).includes(timestampColumn)
) {
return;
}
return timestampColumn;
}
export interface ParseTimeTableProps {
sampleBundle: {
headerAndRows: SampleHeaderAndRows;
sampleResponse: SampleResponse;
spec: Partial<IngestionSpec>;
};
columnFilter: string;
@ -67,28 +73,26 @@ export const ParseTimeTable = React.memo(function ParseTimeTable(props: ParseTim
selectedColumnName,
onTimestampColumnSelect,
} = props;
const { headerAndRows, spec } = sampleBundle;
const { sampleResponse, spec } = sampleBundle;
const timestampSpecColumn = getTimestampSpecColumnFromSpec(spec);
const timestampDetail = getTimestampDetailFromSpec(spec);
return (
<ReactTable
className={classNames('parse-time-table', DEFAULT_TABLE_CLASS_NAME)}
data={headerAndRows.rows}
data={sampleResponse.data}
sortable={false}
defaultPageSize={STANDARD_TABLE_PAGE_SIZE}
pageSizeOptions={STANDARD_TABLE_PAGE_SIZE_OPTIONS}
showPagination={headerAndRows.rows.length > STANDARD_TABLE_PAGE_SIZE}
columns={filterMap(
headerAndRows.header.length ? headerAndRows.header : ['__error__'],
(columnName, i) => {
showPagination={sampleResponse.data.length > STANDARD_TABLE_PAGE_SIZE}
columns={filterMap(getHeaderNamesFromSampleResponse(sampleResponse), (columnName, i) => {
const isTimestamp = columnName === '__time';
if (!isTimestamp && !caseInsensitiveContains(columnName, columnFilter)) return;
const used = timestampSpecColumn === columnName;
const possibleFormat = isTimestamp
? null
: possibleDruidFormatForValues(
filterMap(headerAndRows.rows, d => (d.parsed ? d.parsed[columnName] : undefined)),
filterMap(sampleResponse.data, d => (d.parsed ? d.parsed[columnName] : undefined)),
);
if (possibleTimestampColumnsOnly && !isTimestamp && !possibleFormat) return;
@ -135,8 +139,7 @@ export const ParseTimeTable = React.memo(function ParseTimeTable(props: ParseTim
width: isTimestamp ? 200 : 140,
resizable: !isTimestamp,
};
},
)}
})}
/>
);
});

View File

@ -19,26 +19,19 @@
import { render } from '@testing-library/react';
import React from 'react';
import { JSON_SAMPLE } from '../../../utils/sampler.mock';
import { SchemaTable } from './schema-table';
describe('SchemaTable', () => {
it('matches snapshot', () => {
const sampleData = {
header: ['c1'],
rows: [
{
input: { c1: 'hello' },
parsed: { c1: 'hello' },
},
],
};
const schemaTable = (
<SchemaTable
sampleBundle={{
headerAndRows: sampleData,
sampleResponse: JSON_SAMPLE,
dimensions: [],
metricsSpec: [],
definedDimensions: false,
}}
columnFilter=""
selectedAutoDimension={undefined}

View File

@ -35,15 +35,17 @@ import {
STANDARD_TABLE_PAGE_SIZE_OPTIONS,
} from '../../../react-table';
import { caseInsensitiveContains, filterMap } from '../../../utils';
import type { SampleEntry, SampleHeaderAndRows } from '../../../utils/sampler';
import type { SampleEntry, SampleResponse } from '../../../utils/sampler';
import { getHeaderNamesFromSampleResponse } from '../../../utils/sampler';
import './schema-table.scss';
export interface SchemaTableProps {
sampleBundle: {
headerAndRows: SampleHeaderAndRows;
sampleResponse: SampleResponse;
dimensions: (string | DimensionSpec)[] | undefined;
metricsSpec: MetricSpec[] | undefined;
definedDimensions: boolean;
};
columnFilter: string;
selectedAutoDimension: string | undefined;
@ -65,17 +67,17 @@ export const SchemaTable = React.memo(function SchemaTable(props: SchemaTablePro
onDimensionSelect,
onMetricSelect,
} = props;
const { headerAndRows, dimensions, metricsSpec } = sampleBundle;
const { sampleResponse, dimensions, metricsSpec, definedDimensions } = sampleBundle;
return (
<ReactTable
className={classNames('schema-table', DEFAULT_TABLE_CLASS_NAME)}
data={headerAndRows.rows}
data={sampleResponse.data}
sortable={false}
defaultPageSize={STANDARD_TABLE_PAGE_SIZE}
pageSizeOptions={STANDARD_TABLE_PAGE_SIZE_OPTIONS}
showPagination={headerAndRows.rows.length > STANDARD_TABLE_PAGE_SIZE}
columns={filterMap(headerAndRows.header, (columnName, i) => {
showPagination={sampleResponse.data.length > STANDARD_TABLE_PAGE_SIZE}
columns={filterMap(getHeaderNamesFromSampleResponse(sampleResponse), (columnName, i) => {
if (!caseInsensitiveContains(columnName, columnFilter)) return;
const metricSpecIndex = metricsSpec
@ -130,7 +132,7 @@ export const SchemaTable = React.memo(function SchemaTable(props: SchemaTablePro
onClick={() => {
if (isTimestamp) return;
if (dimensionSpec) {
if (definedDimensions && dimensionSpec) {
onDimensionSelect(inflateDimensionSpec(dimensionSpec), dimensionSpecIndex);
} else {
onAutoDimensionSelect(columnName);
@ -139,7 +141,7 @@ export const SchemaTable = React.memo(function SchemaTable(props: SchemaTablePro
>
<div className="column-name">{columnName}</div>
<div className="column-detail">
{isTimestamp ? 'long (time column)' : dimensionSpecType || 'string (auto)'}&nbsp;
{isTimestamp ? 'long (time column)' : dimensionSpecType || '(auto)'}&nbsp;
</div>
</div>
),

View File

@ -19,23 +19,15 @@
import { render } from '@testing-library/react';
import React from 'react';
import { JSON_SAMPLE } from '../../../utils/sampler.mock';
import { TransformTable } from './transform-table';
describe('TransformTable', () => {
it('matches snapshot', () => {
const sampleData = {
header: ['c1'],
rows: [
{
input: { c1: 'hello' },
parsed: { c1: 'hello' },
},
],
};
const transformTable = (
<TransformTable
sampleData={sampleData}
sampleResponse={JSON_SAMPLE}
columnFilter=""
transformedColumnsOnly={false}
transforms={[]}

View File

@ -30,22 +30,28 @@ import {
} from '../../../react-table';
import { caseInsensitiveContains, filterMap } from '../../../utils';
import { escapeColumnName } from '../../../utils/druid-expression';
import type { SampleEntry, SampleHeaderAndRows } from '../../../utils/sampler';
import type { SampleEntry, SampleResponse } from '../../../utils/sampler';
import { getHeaderNamesFromSampleResponse } from '../../../utils/sampler';
import './transform-table.scss';
export function transformTableSelectedColumnName(
sampleData: SampleHeaderAndRows,
sampleResponse: SampleResponse,
selectedTransform: Partial<Transform> | undefined,
): string | undefined {
if (!selectedTransform) return;
const selectedTransformName = selectedTransform.name;
if (selectedTransformName && !sampleData.header.includes(selectedTransformName)) return;
if (
selectedTransformName &&
!getHeaderNamesFromSampleResponse(sampleResponse).includes(selectedTransformName)
) {
return;
}
return selectedTransformName;
}
export interface TransformTableProps {
sampleData: SampleHeaderAndRows;
sampleResponse: SampleResponse;
columnFilter: string;
transformedColumnsOnly: boolean;
transforms: Transform[];
@ -55,7 +61,7 @@ export interface TransformTableProps {
export const TransformTable = React.memo(function TransformTable(props: TransformTableProps) {
const {
sampleData,
sampleResponse,
columnFilter,
transformedColumnsOnly,
transforms,
@ -66,12 +72,12 @@ export const TransformTable = React.memo(function TransformTable(props: Transfor
return (
<ReactTable
className={classNames('transform-table', DEFAULT_TABLE_CLASS_NAME)}
data={sampleData.rows}
data={sampleResponse.data}
sortable={false}
defaultPageSize={STANDARD_TABLE_PAGE_SIZE}
pageSizeOptions={STANDARD_TABLE_PAGE_SIZE_OPTIONS}
showPagination={sampleData.rows.length > STANDARD_TABLE_PAGE_SIZE}
columns={filterMap(sampleData.header, (columnName, i) => {
showPagination={sampleResponse.data.length > STANDARD_TABLE_PAGE_SIZE}
columns={filterMap(getHeaderNamesFromSampleResponse(sampleResponse), (columnName, i) => {
if (!caseInsensitiveContains(columnName, columnFilter)) return;
const timestamp = columnName === '__time';
const transformIndex = transforms.findIndex(f => f.name === columnName);

View File

@ -32,21 +32,29 @@ import { IconNames } from '@blueprintjs/icons';
import { Popover2 } from '@blueprintjs/popover2';
import classNames from 'classnames';
import { select, selectAll } from 'd3-selection';
import type { QueryResult } from 'druid-query-toolkit';
import { C, F, QueryRunner, SqlExpression, SqlQuery } from 'druid-query-toolkit';
import {
C,
Column,
F,
QueryResult,
QueryRunner,
SqlExpression,
SqlQuery,
SqlType,
} from 'druid-query-toolkit';
import React, { useCallback, useEffect, useLayoutEffect, useMemo, useRef, useState } from 'react';
import { ClearableInput, LearnMore, Loader } from '../../../components';
import { AsyncActionDialog } from '../../../dialogs';
import type { Execution, IngestQueryPattern } from '../../../druid-models';
import type { Execution, ExternalConfig, IngestQueryPattern } from '../../../druid-models';
import {
changeQueryPatternExpression,
externalConfigToTableExpression,
fitIngestQueryPattern,
getDestinationMode,
getQueryPatternExpression,
getQueryPatternExpressionType,
ingestQueryPatternToQuery,
PLACEHOLDER_TIMESTAMP_SPEC,
possibleDruidFormatForValues,
TIME_COLUMN,
WorkbenchQueryPart,
@ -57,6 +65,7 @@ import {
submitTaskQuery,
} from '../../../helpers';
import { useLastDefined, usePermanentCallback, useQueryManager } from '../../../hooks';
import { useLastDefinedDeep } from '../../../hooks/use-last-defined-deep';
import { getLink } from '../../../links';
import { AppToaster } from '../../../singletons';
import type { QueryAction } from '../../../utils';
@ -64,6 +73,7 @@ import {
caseInsensitiveContains,
change,
dataTypeToIcon,
deepSet,
DruidError,
filterMap,
oneOf,
@ -74,6 +84,7 @@ import {
wait,
without,
} from '../../../utils';
import { postToSampler } from '../../../utils/sampler';
import { FlexibleQueryInput } from '../../workbench-view/flexible-query-input/flexible-query-input';
import { ColumnActions } from '../column-actions/column-actions';
import { ColumnEditor } from '../column-editor/column-editor';
@ -375,28 +386,67 @@ export const SchemaStep = function SchemaStep(props: SchemaStepProps) {
},
});
const sampleQueryString = useLastDefined(
const sampleExternalConfig = useLastDefinedDeep(
ingestQueryPattern && mode !== 'sql' // Only sample the data if we are not in the SQL tab live editing the SQL
? SqlQuery.create(
externalConfigToTableExpression(ingestQueryPattern.mainExternalConfig),
).toString()
? ingestQueryPattern.mainExternalConfig
: undefined,
);
const [sampleState] = useQueryManager<string, QueryResult, Execution>({
query: sampleQueryString,
processQuery: async (sampleQueryString, cancelToken) => {
return extractResult(
await submitTaskQuery({
query: sampleQueryString,
context: {
sqlOuterLimit: 50,
const [sampleState] = useQueryManager<ExternalConfig, QueryResult, Execution>({
query: sampleExternalConfig,
processQuery: async sampleExternalConfig => {
const sampleResponse = await postToSampler(
{
type: 'index_parallel',
spec: {
ioConfig: {
type: 'index_parallel',
inputSource: sampleExternalConfig.inputSource,
inputFormat: deepSet(sampleExternalConfig.inputFormat, 'keepNullColumns', true),
},
cancelToken,
dataSchema: {
dataSource: 'sample',
timestampSpec: PLACEHOLDER_TIMESTAMP_SPEC,
dimensionsSpec: {
dimensions: sampleExternalConfig.signature.map(s => {
const t = s.columnType.getNativeType();
return {
name: s.getColumnName(),
type: t === 'COMPLEX<json>' ? 'json' : t,
};
}),
);
},
backgroundStatusCheck: executionBackgroundResultStatusCheck,
granularitySpec: {
rollup: false,
},
},
},
samplerConfig: {
numRows: 50,
timeoutMs: 15000,
},
},
'sample',
);
const columns = filterMap(sampleResponse.logicalSegmentSchema, ({ name, type }) => {
if (name === '__time') return;
return new Column({
name,
nativeType: type,
sqlType: SqlType.fromNativeType(type).toString(),
});
});
return new QueryResult({
header: columns,
rows: filterMap(sampleResponse.data, r => {
const { parsed } = r;
if (!parsed) return;
return columns.map(({ name }) => parsed[name]);
}),
});
},
});
const sampleDataQuery = useMemo(() => {

View File

@ -25,9 +25,9 @@ import React, { useState } from 'react';
import { AutoForm, CenterMessage, LearnMore, Loader } from '../../../components';
import type { InputFormat, InputSource } from '../../../druid-models';
import {
guessColumnTypeFromHeaderAndRows,
guessIsArrayFromHeaderAndRows,
INPUT_FORMAT_FIELDS,
BATCH_INPUT_FORMAT_FIELDS,
guessColumnTypeFromSampleResponse,
guessIsArrayFromSampleResponse,
inputFormatOutputsNumericStrings,
PLACEHOLDER_TIMESTAMP_SPEC,
possibleDruidFormatForValues,
@ -41,8 +41,8 @@ import {
filterMap,
timeFormatToSql,
} from '../../../utils';
import type { SampleHeaderAndRows, SampleSpec } from '../../../utils/sampler';
import { headerAndRowsFromSampleResponse, postToSampler } from '../../../utils/sampler';
import type { SampleResponse, SampleSpec } from '../../../utils/sampler';
import { getHeaderNamesFromSampleResponse, postToSampler } from '../../../utils/sampler';
import { ParseDataTable } from '../../load-data-view/parse-data-table/parse-data-table';
import './input-format-step.scss';
@ -76,11 +76,11 @@ export const InputFormatStep = React.memo(function InputFormatStep(props: InputF
const [inputFormat, setInputFormat] = useState<Partial<InputFormat>>(initInputFormat);
const [inputFormatToSample, setInputFormatToSample] = useState<InputFormat | undefined>(
AutoForm.isValidModel(initInputFormat, INPUT_FORMAT_FIELDS) ? initInputFormat : undefined,
AutoForm.isValidModel(initInputFormat, BATCH_INPUT_FORMAT_FIELDS) ? initInputFormat : undefined,
);
const [selectTimestamp, setSelectTimestamp] = useState(true);
const [previewState] = useQueryManager<InputFormat, SampleHeaderAndRows>({
const [previewState] = useQueryManager<InputFormat, SampleResponse>({
query: inputFormatToSample,
processQuery: async (inputFormat: InputFormat) => {
const sampleSpec: SampleSpec = {
@ -94,7 +94,9 @@ export const InputFormatStep = React.memo(function InputFormatStep(props: InputF
dataSchema: {
dataSource: 'sample',
timestampSpec: PLACEHOLDER_TIMESTAMP_SPEC,
dimensionsSpec: {},
dimensionsSpec: {
useSchemaDiscovery: true,
},
granularitySpec: {
rollup: false,
},
@ -106,22 +108,18 @@ export const InputFormatStep = React.memo(function InputFormatStep(props: InputF
},
};
const sampleResponse = await postToSampler(sampleSpec, 'input-format-step');
return headerAndRowsFromSampleResponse({
sampleResponse,
ignoreTimeColumn: true,
useInput: true,
});
return await postToSampler(sampleSpec, 'input-format-step');
},
});
const previewData = previewState.data;
const previewSampleResponse = previewState.data;
let possibleTimeExpression: PossibleTimeExpression | undefined;
if (previewData) {
possibleTimeExpression = filterMap(previewData.header, column => {
const values = filterMap(previewData.rows, row => row.input?.[column]);
if (previewSampleResponse) {
possibleTimeExpression = filterMap(
getHeaderNamesFromSampleResponse(previewSampleResponse),
column => {
const values = filterMap(previewSampleResponse.data, d => d.input?.[column]);
const possibleDruidFormat = possibleDruidFormatForValues(values);
if (!possibleDruidFormat) return;
@ -132,27 +130,28 @@ export const InputFormatStep = React.memo(function InputFormatStep(props: InputF
column,
timeExpression: formatSql.fillPlaceholders([C(column)]),
};
})[0];
},
)[0];
}
const inputFormatAndMore =
previewData && AutoForm.isValidModel(inputFormat, INPUT_FORMAT_FIELDS)
previewSampleResponse && AutoForm.isValidModel(inputFormat, BATCH_INPUT_FORMAT_FIELDS)
? {
inputFormat,
signature: previewData.header.map(name =>
signature: getHeaderNamesFromSampleResponse(previewSampleResponse, true).map(name =>
SqlColumnDeclaration.create(
name,
SqlType.fromNativeType(
guessColumnTypeFromHeaderAndRows(
previewData,
guessColumnTypeFromSampleResponse(
previewSampleResponse,
name,
inputFormatOutputsNumericStrings(inputFormat),
),
),
),
),
isArrays: previewData.header.map(name =>
guessIsArrayFromHeaderAndRows(previewData, name),
isArrays: getHeaderNamesFromSampleResponse(previewSampleResponse, true).map(name =>
guessIsArrayFromSampleResponse(previewSampleResponse, name),
),
timeExpression: selectTimestamp ? possibleTimeExpression?.timeExpression : undefined,
}
@ -171,9 +170,9 @@ export const InputFormatStep = React.memo(function InputFormatStep(props: InputF
{previewState.error && (
<CenterMessage>{`Error: ${previewState.getErrorMessage()}`}</CenterMessage>
)}
{previewData && (
{previewSampleResponse && (
<ParseDataTable
sampleData={previewData}
sampleResponse={previewSampleResponse}
columnFilter=""
canFlatten={false}
flattenedColumnsOnly={false}
@ -191,15 +190,19 @@ export const InputFormatStep = React.memo(function InputFormatStep(props: InputF
<LearnMore href={`${getLink('DOCS')}/ingestion/data-formats.html`} />
</Callout>
</FormGroup>
<AutoForm fields={INPUT_FORMAT_FIELDS} model={inputFormat} onChange={setInputFormat} />
<AutoForm
fields={BATCH_INPUT_FORMAT_FIELDS}
model={inputFormat}
onChange={setInputFormat}
/>
{inputFormatToSample !== inputFormat && (
<FormGroup className="control-buttons">
<Button
text="Preview changes"
intent={Intent.PRIMARY}
disabled={!AutoForm.isValidModel(inputFormat, INPUT_FORMAT_FIELDS)}
disabled={!AutoForm.isValidModel(inputFormat, BATCH_INPUT_FORMAT_FIELDS)}
onClick={() => {
if (!AutoForm.isValidModel(inputFormat, INPUT_FORMAT_FIELDS)) return;
if (!AutoForm.isValidModel(inputFormat, BATCH_INPUT_FORMAT_FIELDS)) return;
setInputFormatToSample(inputFormat);
}}
/>

View File

@ -39,7 +39,7 @@ import {
externalConfigToTableExpression,
getIngestionImage,
getIngestionTitle,
guessInputFormat,
guessSimpleInputFormat,
INPUT_SOURCE_FIELDS,
PLACEHOLDER_TIMESTAMP_SPEC,
} from '../../../druid-models';
@ -61,7 +61,7 @@ import './input-source-step.scss';
function resultToInputFormat(result: QueryResult): InputFormat {
if (!result.rows.length) throw new Error('No data returned from sample query');
return guessInputFormat(result.rows.map((r: any) => r[0]));
return guessSimpleInputFormat(result.rows.map((r: any) => r[0]));
}
const BOGUS_LIST_DELIMITER = '56616469-6de2-9da4-efb8-8f416e6e6965'; // Just a UUID to disable the list delimiter, let's hope we do not see this UUID in the data
@ -129,7 +129,7 @@ export const InputSourceStep = React.memo(function InputSourceStep(props: InputS
);
if (!sampleLines.length) throw new Error('No data returned from sampler');
guessedInputFormat = guessInputFormat(sampleLines);
guessedInputFormat = guessSimpleInputFormat(sampleLines);
} else {
const tableExpression = externalConfigToTableExpression({
inputSource,