Web console: support new ingest spec format (#8828)

* converter v1

* working v1

* update tests

* update tests

* upgrades

* adjust to new API

* remove hack

* fwd

* step

* neo cache

* fix time selection

* smart reset

* parquest autodetection

* add binaryAsString option

* partitionsSpec

* add ORC support

* ingestSegment -> druid

* remove index tasks

* better min

* load data works

* remove downgrade

* filter on group_id

* fix group_id in test

* update auto form for new props

* add dropBeforeByPeriod rule

* simplify

* prettify json
This commit is contained in:
Vadim Ogievetsky 2019-12-04 20:21:07 -08:00 committed by Fangjin Yang
parent 8dd9a8cb15
commit 1cff73f3e0
34 changed files with 1386 additions and 1126 deletions

View File

Before

Width:  |  Height:  |  Size: 8.4 KiB

After

Width:  |  Height:  |  Size: 8.4 KiB

View File

Before

Width:  |  Height:  |  Size: 19 KiB

After

Width:  |  Height:  |  Size: 19 KiB

View File

Before

Width:  |  Height:  |  Size: 11 KiB

After

Width:  |  Height:  |  Size: 11 KiB

View File

@ -23,7 +23,7 @@ import { compact } from '../../utils';
export interface ArrayInputProps {
className?: string;
values: string[];
values: string[] | undefined;
onChange: (newValues: string[] | undefined) => void;
placeholder?: string;
large?: boolean;
@ -40,8 +40,11 @@ export const ArrayInput = React.memo(function ArrayInput(props: ArrayInputProps)
const stringValue = e.target.value;
const newValues: string[] = stringValue.split(/[,\s]+/).map((v: string) => v.trim());
const newValuesFiltered = compact(newValues);
if (newValues.length === newValuesFiltered.length) {
onChange(stringValue === '' ? undefined : newValuesFiltered);
if (stringValue === '') {
onChange(undefined);
setStringValue(undefined);
} else if (newValues.length === newValuesFiltered.length) {
onChange(newValuesFiltered);
setStringValue(undefined);
} else {
setStringValue(stringValue);
@ -51,7 +54,7 @@ export const ArrayInput = React.memo(function ArrayInput(props: ArrayInputProps)
return (
<TextArea
className={className}
value={stringValue || props.values.join(', ')}
value={stringValue || (props.values || []).join(', ')}
onChange={handleChange}
placeholder={placeholder}
large={large}

View File

@ -25,11 +25,13 @@ import { FormGroupWithInfo } from '../form-group-with-info/form-group-with-info'
import { IntervalInput } from '../interval-input/interval-input';
import { JsonInput } from '../json-input/json-input';
import { PopoverText } from '../popover-text/popover-text';
import { SuggestibleInput, SuggestionGroup } from '../suggestible-input/suggestible-input';
import { SuggestibleInput, Suggestion } from '../suggestible-input/suggestible-input';
import './auto-form.scss';
export interface Field<T> {
export type Functor<M, R> = R | ((model: M) => R);
export interface Field<M> {
name: string;
label?: string;
info?: React.ReactNode;
@ -43,12 +45,14 @@ export interface Field<T> {
| 'json'
| 'interval';
defaultValue?: any;
suggestions?: (string | SuggestionGroup)[];
suggestions?: Functor<M, Suggestion[]>;
placeholder?: string;
min?: number;
disabled?: boolean | ((model: T) => boolean);
defined?: boolean | ((model: T) => boolean);
required?: boolean | ((model: T) => boolean);
zeroMeansUndefined?: boolean;
disabled?: Functor<M, boolean>;
defined?: Functor<M, boolean>;
required?: Functor<M, boolean>;
adjustment?: (model: M) => M;
}
export interface AutoFormProps<T> {
@ -73,21 +77,16 @@ export class AutoForm<T extends Record<string, any>> extends React.PureComponent
return newLabel;
}
static evaluateFunctor<T>(
functor: undefined | boolean | ((model: T) => boolean),
model: T | undefined,
defaultValue = false,
): boolean {
static evaluateFunctor<M, R>(
functor: undefined | Functor<M, R>,
model: M | undefined,
defaultValue: R,
): R {
if (!model || functor == null) return defaultValue;
switch (typeof functor) {
case 'boolean':
return functor;
case 'function':
return functor(model);
default:
throw new TypeError(`invalid functor`);
if (typeof functor === 'function') {
return (functor as any)(model);
} else {
return functor;
}
}
@ -109,27 +108,42 @@ export class AutoForm<T extends Record<string, any>> extends React.PureComponent
};
private modelChange = (newModel: T) => {
const { fields, onChange } = this.props;
const { fields, onChange, model } = this.props;
// Delete things that are not defined now (but were defined prior to the change)
for (const someField of fields) {
if (!AutoForm.evaluateFunctor(someField.defined, newModel, true)) {
if (
!AutoForm.evaluateFunctor(someField.defined, newModel, true) &&
AutoForm.evaluateFunctor(someField.defined, model, true)
) {
newModel = deepDelete(newModel, someField.name);
}
}
// Perform any adjustments if needed
for (const someField of fields) {
if (someField.adjustment) {
newModel = someField.adjustment(newModel);
}
}
onChange(newModel);
};
private renderNumberInput(field: Field<T>): JSX.Element {
const { model, large, onFinalize } = this.props;
const modelValue = deepGet(model as any, field.name) || field.defaultValue;
let modelValue = deepGet(model as any, field.name);
if (typeof modelValue !== 'number') modelValue = field.defaultValue;
return (
<NumericInput
value={modelValue}
onValueChange={(valueAsNumber: number, valueAsString: string) => {
if (valueAsString === '' || isNaN(valueAsNumber)) return;
this.fieldChange(field, valueAsNumber);
this.fieldChange(
field,
valueAsNumber === 0 && field.zeroMeansUndefined ? undefined : valueAsNumber,
);
}}
onBlur={e => {
if (e.target.value === '') {
@ -140,10 +154,10 @@ export class AutoForm<T extends Record<string, any>> extends React.PureComponent
min={field.min || 0}
fill
large={large}
disabled={AutoForm.evaluateFunctor(field.disabled, model)}
disabled={AutoForm.evaluateFunctor(field.disabled, model, false)}
placeholder={field.placeholder}
intent={
AutoForm.evaluateFunctor(field.required, model) && modelValue == null
AutoForm.evaluateFunctor(field.required, model, false) && modelValue == null
? AutoForm.REQUIRED_INTENT
: undefined
}
@ -169,7 +183,7 @@ export class AutoForm<T extends Record<string, any>> extends React.PureComponent
majorStepSize={1000000}
fill
large={large}
disabled={AutoForm.evaluateFunctor(field.disabled, model)}
disabled={AutoForm.evaluateFunctor(field.disabled, model, false)}
/>
);
}
@ -190,11 +204,11 @@ export class AutoForm<T extends Record<string, any>> extends React.PureComponent
}}
onFinalize={onFinalize}
placeholder={field.placeholder}
suggestions={field.suggestions}
suggestions={AutoForm.evaluateFunctor(field.suggestions, model, undefined)}
large={large}
disabled={AutoForm.evaluateFunctor(field.disabled, model)}
disabled={AutoForm.evaluateFunctor(field.disabled, model, false)}
intent={
AutoForm.evaluateFunctor(field.required, model) && modelValue == null
AutoForm.evaluateFunctor(field.required, model, false) && modelValue == null
? AutoForm.REQUIRED_INTENT
: undefined
}
@ -206,9 +220,9 @@ export class AutoForm<T extends Record<string, any>> extends React.PureComponent
const { model, large, onFinalize } = this.props;
const modelValue = deepGet(model as any, field.name);
const shownValue = modelValue == null ? field.defaultValue : modelValue;
const disabled = AutoForm.evaluateFunctor(field.disabled, model);
const disabled = AutoForm.evaluateFunctor(field.disabled, model, false);
const intent =
AutoForm.evaluateFunctor(field.required, model) && modelValue == null
AutoForm.evaluateFunctor(field.required, model, false) && modelValue == null
? AutoForm.REQUIRED_INTENT
: undefined;
@ -263,9 +277,9 @@ export class AutoForm<T extends Record<string, any>> extends React.PureComponent
}}
placeholder={field.placeholder}
large={large}
disabled={AutoForm.evaluateFunctor(field.disabled, model)}
disabled={AutoForm.evaluateFunctor(field.disabled, model, false)}
intent={
AutoForm.evaluateFunctor(field.required, model) && modelValue == null
AutoForm.evaluateFunctor(field.required, model, false) && modelValue == null
? AutoForm.REQUIRED_INTENT
: undefined
}

View File

@ -90,59 +90,54 @@ exports[`rule editor matches snapshot 1`] = `
>
<select>
<option
value="load"
value="loadForever"
>
Load
loadForever
</option>
<option
value="drop"
value="loadByInterval"
>
Drop
loadByInterval
</option>
<option
value="broadcast"
value="loadByPeriod"
>
Broadcast
</option>
</select>
<span
class="bp3-icon bp3-icon-double-caret-vertical"
icon="double-caret-vertical"
>
<svg
data-icon="double-caret-vertical"
height="16"
viewBox="0 0 16 16"
width="16"
>
<desc>
double-caret-vertical
</desc>
<path
d="M5 7h6a1.003 1.003 0 00.71-1.71l-3-3C8.53 2.11 8.28 2 8 2s-.53.11-.71.29l-3 3A1.003 1.003 0 005 7zm6 2H5a1.003 1.003 0 00-.71 1.71l3 3c.18.18.43.29.71.29s.53-.11.71-.29l3-3A1.003 1.003 0 0011 9z"
fill-rule="evenodd"
/>
</svg>
</span>
</div>
<div
class="bp3-html-select"
>
<select>
<option
value="Forever"
>
forever
loadByPeriod
</option>
<option
value="ByPeriod"
value="dropForever"
>
by period
dropForever
</option>
<option
value="ByInterval"
value="dropByInterval"
>
by interval
dropByInterval
</option>
<option
value="dropByPeriod"
>
dropByPeriod
</option>
<option
value="dropBeforeByPeriod"
>
dropBeforeByPeriod
</option>
<option
value="broadcastForever"
>
broadcastForever
</option>
<option
value="broadcastByInterval"
>
broadcastByInterval
</option>
<option
value="broadcastByPeriod"
>
broadcastByPeriod
</option>
</select>
<span

View File

@ -31,10 +31,7 @@
}
}
.by-period {
display: flex;
.bp3-input-group {
padding-right: 15px;
}
.include-future {
margin-left: 15px;
}
}

View File

@ -49,9 +49,6 @@ export const RuleEditor = React.memo(function RuleEditor(props: RuleEditorProps)
const [isOpen, setIsOpen] = useState(true);
if (!rule) return null;
const ruleLoadType = RuleUtil.getLoadType(rule);
const ruleTimeType = RuleUtil.getTimeType(rule);
function removeTier(key: string) {
const newTierReplicants = Object.assign({}, rule.tieredReplicants);
delete newTierReplicants[key];
@ -72,14 +69,12 @@ export const RuleEditor = React.memo(function RuleEditor(props: RuleEditorProps)
}
}
onChange(RuleUtil.changeTierReplication(rule, newTierName, 1));
onChange(RuleUtil.addTieredReplicant(rule, newTierName, 1));
}
function renderTiers() {
if (RuleUtil.getLoadType(rule) !== 'load') return null;
const tieredReplicants = rule.tieredReplicants;
if (!tieredReplicants) return null;
if (!tieredReplicants) return;
const ruleTiers = Object.keys(tieredReplicants).sort();
return ruleTiers.map(tier => {
@ -92,7 +87,7 @@ export const RuleEditor = React.memo(function RuleEditor(props: RuleEditorProps)
value={tieredReplicants[tier]}
onValueChange={(v: number) => {
if (isNaN(v)) return;
onChange(RuleUtil.changeTierReplication(rule, tier, v));
onChange(RuleUtil.addTieredReplicant(rule, tier, v));
}}
min={1}
max={256}
@ -103,7 +98,9 @@ export const RuleEditor = React.memo(function RuleEditor(props: RuleEditorProps)
<HTMLSelect
fill
value={tier}
onChange={(e: any) => onChange(RuleUtil.changeTier(rule, tier, e.target.value))}
onChange={(e: any) =>
onChange(RuleUtil.renameTieredReplicants(rule, tier, e.target.value))
}
>
{tiers
.filter(t => t === tier || !tieredReplicants[t])
@ -127,7 +124,7 @@ export const RuleEditor = React.memo(function RuleEditor(props: RuleEditorProps)
function renderTierAdder() {
const { rule, tiers } = props;
if (Object.keys(rule.tieredReplicants || {}).length >= Object.keys(tiers).length) return null;
if (Object.keys(rule.tieredReplicants || {}).length >= Object.keys(tiers).length) return;
return (
<FormGroup className="right">
@ -138,18 +135,6 @@ export const RuleEditor = React.memo(function RuleEditor(props: RuleEditorProps)
);
}
function renderColocatedDataSources() {
const { rule, onChange } = props;
return (
<FormGroup label="Colocated datasources:">
<TagInput
values={rule.colocatedDataSources || []}
onChange={(v: any) => onChange(RuleUtil.changeColocatedDataSources(rule, v))}
fill
/>
</FormGroup>
);
}
return (
<div className="rule-editor">
<div className="title">
@ -172,52 +157,39 @@ export const RuleEditor = React.memo(function RuleEditor(props: RuleEditorProps)
<FormGroup>
<ControlGroup>
<HTMLSelect
value={ruleLoadType}
value={rule.type}
onChange={(e: any) =>
onChange(RuleUtil.changeLoadType(rule, e.target.value as any))
onChange(RuleUtil.changeRuleType(rule, e.target.value as any))
}
>
<option value="load">Load</option>
<option value="drop">Drop</option>
<option value="broadcast">Broadcast</option>
{RuleUtil.TYPES.map(type => {
return (
<option key={type} value={type}>
{type}
</option>
);
})}
</HTMLSelect>
<HTMLSelect
value={ruleTimeType}
onChange={(e: any) =>
onChange(RuleUtil.changeTimeType(rule, e.target.value as any))
}
>
<option value="Forever">forever</option>
<option value="ByPeriod">by period</option>
<option value="ByInterval">by interval</option>
</HTMLSelect>
{ruleTimeType === 'ByPeriod' && (
<div className={`by-period`}>
<InputGroup
value={rule.period || ''}
onChange={(e: any) =>
onChange(RuleUtil.changePeriod(rule, e.target.value as any))
}
placeholder="P1D"
/>
<Switch
large
checked={rule.includeFuture !== undefined ? rule.includeFuture : true}
label={`Include future`}
onChange={() => {
onChange(
RuleUtil.changeIncludeFuture(
rule,
rule.includeFuture !== undefined
? (!rule.includeFuture as boolean)
: false,
),
);
}}
/>
</div>
{RuleUtil.hasPeriod(rule) && (
<InputGroup
value={rule.period || ''}
onChange={(e: any) =>
onChange(RuleUtil.changePeriod(rule, e.target.value as any))
}
placeholder="P1D"
/>
)}
{ruleTimeType === 'ByInterval' && (
{RuleUtil.hasIncludeFuture(rule) && (
<Switch
className="include-future"
checked={rule.includeFuture || false}
label="Include future"
onChange={() => {
onChange(RuleUtil.changeIncludeFuture(rule, !rule.includeFuture));
}}
/>
)}
{RuleUtil.hasInterval(rule) && (
<InputGroup
value={rule.interval || ''}
onChange={(e: any) =>
@ -228,13 +200,21 @@ export const RuleEditor = React.memo(function RuleEditor(props: RuleEditorProps)
)}
</ControlGroup>
</FormGroup>
{ruleLoadType === 'load' && (
{RuleUtil.hasTieredReplicants(rule) && (
<FormGroup>
{renderTiers()}
{renderTierAdder()}
</FormGroup>
)}
{ruleLoadType === 'broadcast' && <FormGroup>{renderColocatedDataSources()}</FormGroup>}
{RuleUtil.hasColocatedDataSources(rule) && (
<FormGroup label="Colocated datasources">
<TagInput
values={rule.colocatedDataSources || []}
onChange={(v: any) => onChange(RuleUtil.changeColocatedDataSources(rule, v))}
fill
/>
</FormGroup>
)}
</Card>
</Collapse>
</div>

View File

@ -35,10 +35,12 @@ export interface SuggestionGroup {
suggestions: string[];
}
export type Suggestion = string | SuggestionGroup;
export interface SuggestibleInputProps extends HTMLInputProps {
onValueChange: (newValue: string) => void;
onFinalize?: () => void;
suggestions?: (string | SuggestionGroup)[];
suggestions?: Suggestion[];
large?: boolean;
intent?: Intent;
}

View File

@ -70,6 +70,7 @@ export class ConsoleApplication extends React.PureComponent<
private supervisorId?: string;
private taskId?: string;
private taskGroupId?: string;
private openDialog?: string;
private datasource?: string;
private onlyUnavailable?: boolean;
@ -109,6 +110,7 @@ export class ConsoleApplication extends React.PureComponent<
private resetInitialsWithDelay() {
setTimeout(() => {
this.taskId = undefined;
this.taskGroupId = undefined;
this.supervisorId = undefined;
this.openDialog = undefined;
this.datasource = undefined;
@ -138,8 +140,8 @@ export class ConsoleApplication extends React.PureComponent<
this.resetInitialsWithDelay();
};
private goToIngestionWithTaskId = (taskId?: string, openDialog?: string) => {
this.taskId = taskId;
private goToIngestionWithTaskGroupId = (taskGroupId?: string, openDialog?: string) => {
this.taskGroupId = taskGroupId;
if (openDialog) this.openDialog = openDialog;
window.location.hash = 'ingestion';
this.resetInitialsWithDelay();
@ -193,7 +195,7 @@ export class ConsoleApplication extends React.PureComponent<
initSupervisorId={this.supervisorId}
initTaskId={this.taskId}
exampleManifestsUrl={exampleManifestsUrl}
goToTask={this.goToIngestionWithTaskId}
goToIngestion={this.goToIngestionWithTaskGroupId}
/>,
'narrow-pad',
);
@ -235,7 +237,7 @@ export class ConsoleApplication extends React.PureComponent<
return this.wrapInViewContainer(
'ingestion',
<IngestionView
taskId={this.taskId}
taskGroupId={this.taskGroupId}
datasourceId={this.datasource}
openDialog={this.openDialog}
goToDatasource={this.goToDatasources}
@ -254,7 +256,7 @@ export class ConsoleApplication extends React.PureComponent<
<ServicesView
middleManager={this.middleManager}
goToQuery={this.goToQuery}
goToTask={this.goToIngestionWithTaskId}
goToTask={this.goToIngestionWithTaskGroupId}
capabilities={capabilities}
/>,
);

View File

@ -230,23 +230,21 @@ export const DOCTOR_CHECKS: DoctorCheck[] = [
try {
testSampledData = await postToSampler(
{
type: 'index',
type: 'index_parallel',
spec: {
type: 'index',
ioConfig: { type: 'index', firehose: { type: 'inline', data: '{"test":"Data"}' } },
type: 'index_parallel',
ioConfig: {
type: 'index_parallel',
inputSource: { type: 'inline', data: '{"test":"Data"}' },
inputFormat: { type: 'json' },
},
dataSchema: {
dataSource: 'sample',
parser: {
type: 'string',
parseSpec: {
format: 'json',
timestampSpec: {
column: '!!!_no_such_column_!!!',
missingValue: '2010-01-01T00:00:00Z',
},
dimensionsSpec: { dimensions: ['test'] },
},
timestampSpec: {
column: '!!!_no_such_column_!!!',
missingValue: '2010-01-01T00:00:00Z',
},
dimensionsSpec: { dimensions: ['test'] },
transformSpec: {},
metricsSpec: [],
granularitySpec: { queryGranularity: 'NONE' },

View File

@ -0,0 +1,76 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[`ingestion-spec upgrades 1`] = `
Object {
"dataSchema": Object {
"dataSource": "wikipedia",
"dimensionsSpec": Object {
"dimensions": Array [
"channel",
"cityName",
"comment",
],
},
"granularitySpec": Object {
"queryGranularity": "HOUR",
"rollup": true,
"segmentGranularity": "DAY",
"type": "uniform",
},
"metricsSpec": Array [
Object {
"name": "count",
"type": "count",
},
Object {
"fieldName": "added",
"name": "sum_added",
"type": "longSum",
},
],
"timestampSpec": Object {
"column": "timestamp",
"format": "iso",
},
"transformSpec": Object {
"filter": Object {
"dimension": "commentLength",
"type": "selector",
"value": "35",
},
"transforms": Array [
Object {
"expression": "concat(\\"channel\\", 'lol')",
"name": "channel",
"type": "expression",
},
],
},
},
"ioConfig": Object {
"inputFormat": Object {
"flattenSpec": Object {
"fields": Array [
Object {
"expr": "$.cityName",
"name": "cityNameAlt",
"type": "path",
},
],
},
"type": "json",
},
"inputSource": Object {
"type": "http",
"uris": Array [
"https://static.imply.io/data/wikipedia.json.gz",
],
},
"type": "index_parallel",
},
"tuningConfig": Object {
"type": "index_parallel",
},
"type": "index_parallel",
}
`;

View File

@ -88,17 +88,13 @@ export function updateSchemaWithSample(
let newSpec = spec;
if (dimensionMode === 'auto-detect') {
newSpec = deepSet(newSpec, 'dataSchema.parser.parseSpec.dimensionsSpec.dimensions', []);
newSpec = deepSet(newSpec, 'dataSchema.dimensionsSpec.dimensions', []);
} else {
newSpec = deepDelete(newSpec, 'dataSchema.parser.parseSpec.dimensionsSpec.dimensionExclusions');
newSpec = deepDelete(newSpec, 'dataSchema.dimensionsSpec.dimensionExclusions');
const dimensions = getDimensionSpecs(headerAndRows, rollup);
if (dimensions) {
newSpec = deepSet(
newSpec,
'dataSchema.parser.parseSpec.dimensionsSpec.dimensions',
dimensions,
);
newSpec = deepSet(newSpec, 'dataSchema.dimensionsSpec.dimensions', dimensions);
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { downgradeSpec, upgradeSpec } from './ingestion-spec';
describe('ingestion-spec', () => {
const oldSpec = {
type: 'index_parallel',
ioConfig: {
type: 'index_parallel',
firehose: {
type: 'http',
uris: ['https://static.imply.io/data/wikipedia.json.gz'],
},
},
tuningConfig: {
type: 'index_parallel',
},
dataSchema: {
dataSource: 'wikipedia',
granularitySpec: {
type: 'uniform',
segmentGranularity: 'DAY',
queryGranularity: 'HOUR',
rollup: true,
},
parser: {
type: 'string',
parseSpec: {
format: 'json',
timestampSpec: {
column: 'timestamp',
format: 'iso',
},
dimensionsSpec: {
dimensions: ['channel', 'cityName', 'comment'],
},
flattenSpec: {
fields: [
{
type: 'path',
name: 'cityNameAlt',
expr: '$.cityName',
},
],
},
},
},
transformSpec: {
transforms: [
{
type: 'expression',
name: 'channel',
expression: 'concat("channel", \'lol\')',
},
],
filter: {
type: 'selector',
dimension: 'commentLength',
value: '35',
},
},
metricsSpec: [
{
name: 'count',
type: 'count',
},
{
name: 'sum_added',
type: 'longSum',
fieldName: 'added',
},
],
},
};
it('upgrades', () => {
expect(upgradeSpec(oldSpec)).toMatchSnapshot();
});
it('round trips', () => {
expect(downgradeSpec(upgradeSpec(oldSpec))).toMatchObject(oldSpec);
});
});

File diff suppressed because it is too large Load Diff

View File

@ -16,17 +16,22 @@
* limitations under the License.
*/
import { deepMove, deepSet } from './object-change';
export type RuleType =
| 'loadForever'
| 'loadByInterval'
| 'loadByPeriod'
| 'dropForever'
| 'dropByInterval'
| 'dropByPeriod'
| 'dropBeforeByPeriod'
| 'broadcastForever'
| 'broadcastByInterval'
| 'broadcastByPeriod';
export interface Rule {
type:
| 'loadForever'
| 'loadByInterval'
| 'loadByPeriod'
| 'dropForever'
| 'dropByInterval'
| 'dropByPeriod'
| 'broadcastForever'
| 'broadcastByInterval'
| 'broadcastByPeriod';
type: RuleType;
interval?: string;
period?: string;
includeFuture?: boolean;
@ -34,83 +39,97 @@ export interface Rule {
colocatedDataSources?: string[];
}
export type LoadType = 'load' | 'drop' | 'broadcast';
export type TimeType = 'Forever' | 'ByInterval' | 'ByPeriod';
export class RuleUtil {
static shouldIncludeFuture(rule: Rule): boolean {
if (rule.includeFuture !== false) {
return (
rule.type === 'loadByPeriod' ||
rule.type === 'dropByPeriod' ||
rule.type === 'broadcastByPeriod'
);
}
return false;
}
static TYPES: RuleType[] = [
'loadForever',
'loadByInterval',
'loadByPeriod',
'dropForever',
'dropByInterval',
'dropByPeriod',
'dropBeforeByPeriod',
'broadcastForever',
'broadcastByInterval',
'broadcastByPeriod',
];
static ruleToString(rule: Rule): string {
return (
rule.type +
(rule.period ? `(${rule.period})` : '') +
(rule.interval ? `(${rule.interval})` : '') +
(RuleUtil.shouldIncludeFuture(rule) ? `(includeFuture)` : '')
);
return [
rule.type,
rule.period ? `(${rule.period}${rule.includeFuture ? `+future` : ''})` : '',
rule.interval ? `(${rule.interval})` : '',
].join('');
}
static getLoadType(rule: Rule): LoadType {
const m = rule.type.match(/^(load|drop|broadcast)(\w+)$/);
if (!m) throw new Error(`unknown rule type: '${rule.type}'`);
return m[1] as any;
}
static changeRuleType(rule: Rule, type: RuleType): Rule {
const newRule = deepSet(rule, 'type', type);
static getTimeType(rule: Rule): TimeType {
const m = rule.type.match(/^(load|drop|broadcast)(\w+)$/);
if (!m) throw new Error(`unknown rule type: '${rule.type}'`);
return m[2] as any;
}
if (RuleUtil.hasPeriod(newRule)) {
if (!newRule.period) newRule.period = 'P1M';
} else {
delete newRule.period;
delete newRule.includeFuture;
}
if (RuleUtil.hasInterval(newRule)) {
if (!newRule.interval) newRule.interval = '2010-01-01/2020-01-01';
} else {
delete newRule.interval;
}
if (RuleUtil.hasTieredReplicants(newRule)) {
if (!newRule.tieredReplicants) newRule.tieredReplicants = { _default_tier: 2 };
} else {
delete newRule.tieredReplicants;
}
if (!RuleUtil.hasColocatedDataSources(newRule)) delete newRule.colocatedDataSources;
static changeLoadType(rule: Rule, loadType: LoadType): Rule {
const newRule = Object.assign({}, rule, { type: loadType + RuleUtil.getTimeType(rule) });
if (loadType !== 'load') delete newRule.tieredReplicants;
if (loadType !== 'broadcast') delete newRule.colocatedDataSources;
return newRule;
}
static changeTimeType(rule: Rule, timeType: TimeType): Rule {
const newRule = Object.assign({}, rule, { type: RuleUtil.getLoadType(rule) + timeType });
if (timeType !== 'ByPeriod') delete newRule.period;
if (timeType !== 'ByInterval') delete newRule.interval;
return newRule;
static hasPeriod(rule: Rule): boolean {
return rule.type.endsWith('ByPeriod');
}
static changePeriod(rule: Rule, period: string): Rule {
return Object.assign({}, rule, { period });
return deepSet(rule, 'period', period);
}
static hasIncludeFuture(rule: Rule): boolean {
return RuleUtil.hasPeriod(rule) && rule.type !== 'dropBeforeByPeriod';
}
static changeIncludeFuture(rule: Rule, includeFuture: boolean): Rule {
return Object.assign({}, rule, { includeFuture });
return deepSet(rule, 'includeFuture', includeFuture);
}
static hasInterval(rule: Rule): boolean {
return rule.type.endsWith('ByInterval');
}
static changeInterval(rule: Rule, interval: string): Rule {
return Object.assign({}, rule, { interval });
return deepSet(rule, 'interval', interval);
}
static changeTier(rule: Rule, oldTier: string, newTier: string): Rule {
const newRule = Object.assign({}, rule);
newRule.tieredReplicants = Object.assign({}, newRule.tieredReplicants);
newRule.tieredReplicants[newTier] = newRule.tieredReplicants[oldTier];
delete newRule.tieredReplicants[oldTier];
return newRule;
static hasTieredReplicants(rule: Rule): boolean {
return rule.type.startsWith('load');
}
static changeTierReplication(rule: Rule, tier: string, replication: number): Rule {
const newRule = Object.assign({}, rule);
newRule.tieredReplicants = Object.assign({}, newRule.tieredReplicants, { [tier]: replication });
return newRule;
static renameTieredReplicants(rule: Rule, oldTier: string, newTier: string): Rule {
return deepMove(rule, `tieredReplicants.${oldTier}`, `tieredReplicants.${newTier}`);
}
static addTieredReplicant(rule: Rule, tier: string, replication: number): Rule {
const newTieredReplicants = deepSet(rule.tieredReplicants || {}, tier, replication);
return deepSet(rule, 'tieredReplicants', newTieredReplicants);
}
static hasColocatedDataSources(rule: Rule): boolean {
return rule.type.startsWith('broadcast');
}
static changeColocatedDataSources(rule: Rule, colocatedDataSources: string[]): Rule {
return Object.assign({}, rule, { colocatedDataSources });
return deepSet(rule, 'colocatedDataSources', colocatedDataSources);
}
}

View File

@ -111,6 +111,16 @@ export function deepDelete<T extends Record<string, any>>(value: T, path: string
return valueCopy;
}
export function deepMove<T extends Record<string, any>>(
value: T,
fromPath: string,
toPath: string,
): T {
value = deepSet(value, toPath, deepGet(value, fromPath));
value = deepDelete(value, fromPath);
return value;
}
export function deepExtend<T extends Record<string, any>>(target: T, diff: Record<string, any>): T {
if (typeof target !== 'object') throw new TypeError(`Invalid target`);
if (typeof diff !== 'object') throw new TypeError(`Invalid diff`);

View File

@ -22,25 +22,24 @@ import { getDruidErrorMessage, queryDruidRune } from './druid-query';
import { alphanumericCompare, filterMap, sortWithPrefixSuffix } from './general';
import {
DimensionsSpec,
getEmptyTimestampSpec,
getDummyTimestampSpec,
getSpecType,
IngestionSpec,
IngestionType,
InputFormat,
IoConfig,
isColumnTimestampSpec,
isIngestSegment,
isDruidSource,
MetricSpec,
Parser,
ParseSpec,
TimestampSpec,
Transform,
TransformSpec,
upgradeSpec,
} from './ingestion-spec';
import { deepGet, deepSet, whitelistKeys } from './object-change';
const MS_IN_HOUR = 60 * 60 * 1000;
import { deepGet, deepSet } from './object-change';
const SAMPLER_URL = `/druid/indexer/v1/sampler`;
const BASE_SAMPLER_CONFIG: SamplerConfig = {
// skipCache: true,
numRows: 500,
timeoutMs: 15000,
};
@ -54,25 +53,23 @@ export interface SampleSpec {
export interface SamplerConfig {
numRows?: number;
timeoutMs?: number;
cacheKey?: string;
skipCache?: boolean;
}
export interface SampleResponse {
cacheKey?: string;
data: SampleEntry[];
}
export type CacheRows = Record<string, any>[];
export interface SampleResponseWithExtraInfo extends SampleResponse {
queryGranularity?: any;
timestampSpec?: any;
rollup?: boolean;
columns?: Record<string, any>;
aggregators?: Record<string, any>;
}
export interface SampleEntry {
raw: string;
input: Record<string, any>;
parsed?: Record<string, any>;
unparseable?: boolean;
error?: string;
@ -101,12 +98,35 @@ function dedupe(xs: string[]): string[] {
});
}
type SamplerType = 'index' | 'kafka' | 'kinesis';
export function getCacheRowsFromSampleResponse(
sampleResponse: SampleResponse,
useParsed = false,
): CacheRows {
const key = useParsed ? 'parsed' : 'input';
return filterMap(sampleResponse.data, d => d[key]).slice(0, 20);
}
export function getSamplerType(spec: IngestionSpec): SamplerType {
const specType = getSpecType(spec);
if (specType === 'kafka' || specType === 'kinesis') return specType;
return 'index';
export function applyCache(sampleSpec: SampleSpec, cacheRows: CacheRows) {
if (!cacheRows) return sampleSpec;
// If this is already an inline spec there is nothing to do
if (deepGet(sampleSpec, 'spec.ioConfig.inputSource.type') === 'inline') return sampleSpec;
// Make the spec into an inline json spec
sampleSpec = deepSet(sampleSpec, 'type', 'index');
sampleSpec = deepSet(sampleSpec, 'spec.type', 'index');
sampleSpec = deepSet(sampleSpec, 'spec.ioConfig.type', 'index');
sampleSpec = deepSet(sampleSpec, 'spec.ioConfig.inputSource', {
type: 'inline',
data: cacheRows.map(r => JSON.stringify(r)).join('\n'),
});
const flattenSpec = deepGet(sampleSpec, 'spec.ioConfig.inputFormat.flattenSpec');
const inputFormat: InputFormat = { type: 'json' };
if (flattenSpec) inputFormat.flattenSpec = flattenSpec;
sampleSpec = deepSet(sampleSpec, 'spec.ioConfig.inputFormat', inputFormat);
return sampleSpec;
}
export function headerFromSampleResponse(
@ -140,7 +160,7 @@ export function headerAndRowsFromSampleResponse(
};
}
export async function getOverlordModules(): Promise<string[]> {
export async function getProxyOverlordModules(): Promise<string[]> {
let statusResp: any;
try {
statusResp = await axios.get(`/proxy/overlord/status`);
@ -155,6 +175,8 @@ export async function postToSampler(
sampleSpec: SampleSpec,
forStr: string,
): Promise<SampleResponse> {
sampleSpec = fixSamplerTypes(sampleSpec);
let sampleResp: any;
try {
sampleResp = await axios.post(`${SAMPLER_URL}?for=${forStr}`, sampleSpec);
@ -169,77 +191,72 @@ export type SampleStrategy = 'start' | 'end';
function makeSamplerIoConfig(
ioConfig: IoConfig,
samplerType: SamplerType,
specType: IngestionType,
sampleStrategy: SampleStrategy,
): IoConfig {
ioConfig = deepSet(ioConfig || {}, 'type', samplerType);
if (samplerType === 'kafka') {
ioConfig = deepSet(ioConfig || {}, 'type', specType);
if (specType === 'kafka') {
ioConfig = deepSet(ioConfig, 'useEarliestOffset', sampleStrategy === 'start');
} else if (samplerType === 'kinesis') {
} else if (specType === 'kinesis') {
ioConfig = deepSet(ioConfig, 'useEarliestSequenceNumber', sampleStrategy === 'start');
}
return ioConfig;
}
/**
* This function scopes down the interval of an ingestSegment firehose for the data sampler
* this is needed because the ingestSegment firehose gets the interval you are sampling over,
* looks up the corresponding segments and segment locations from metadata store, downloads
* every segment from deep storage to disk, and then maps all the segments into memory;
* and this happens in the constructor before the timer thread is even created meaning the sampler
* will time out on a larger interval.
* This is essentially a workaround for https://github.com/apache/incubator-druid/issues/8448
* @param ioConfig The IO Config to scope down the interval of
This is a hack to deal with the fact that the sampler can not deal with the index_parallel type
*/
export async function scopeDownIngestSegmentFirehoseIntervalIfNeeded(
ioConfig: IoConfig,
): Promise<IoConfig> {
if (deepGet(ioConfig, 'firehose.type') !== 'ingestSegment') return ioConfig;
const interval = deepGet(ioConfig, 'firehose.interval');
const intervalParts = interval.split('/');
const start = new Date(intervalParts[0]);
if (isNaN(start.valueOf())) throw new Error(`could not decode interval start`);
const end = new Date(intervalParts[1]);
if (isNaN(end.valueOf())) throw new Error(`could not decode interval end`);
function fixSamplerTypes(sampleSpec: SampleSpec): SampleSpec {
let samplerType: string = getSpecType(sampleSpec.spec);
if (samplerType === 'index_parallel') {
samplerType = 'index';
}
// Less than or equal to 1 hour so there is no need to adjust intervals
if (Math.abs(end.valueOf() - start.valueOf()) <= MS_IN_HOUR) return ioConfig;
sampleSpec = deepSet(sampleSpec, 'type', samplerType);
sampleSpec = deepSet(sampleSpec, 'spec.type', samplerType);
sampleSpec = deepSet(sampleSpec, 'spec.ioConfig.type', samplerType);
sampleSpec = deepSet(sampleSpec, 'spec.tuningConfig.type', samplerType);
return sampleSpec;
}
const dataSourceMetadataResponse = await queryDruidRune({
queryType: 'dataSourceMetadata',
dataSource: deepGet(ioConfig, 'firehose.dataSource'),
});
function cleanupQueryGranularity(queryGranularity: any): any {
let queryGranularityType = deepGet(queryGranularity, 'type');
if (typeof queryGranularityType !== 'string') return queryGranularity;
queryGranularityType = queryGranularityType.toUpperCase();
const maxIngestedEventTime = new Date(
deepGet(dataSourceMetadataResponse, '0.result.maxIngestedEventTime'),
);
const knownGranularity = [
'NONE',
'SECOND',
'MINUTE',
'HOUR',
'DAY',
'WEEK',
'MONTH',
'YEAR',
].includes(queryGranularityType);
// If invalid maxIngestedEventTime do nothing
if (isNaN(maxIngestedEventTime.valueOf())) return ioConfig;
// If maxIngestedEventTime is before the start of the interval do nothing
if (maxIngestedEventTime < start) return ioConfig;
const newEnd = maxIngestedEventTime < end ? maxIngestedEventTime : end;
const newStart = new Date(newEnd.valueOf() - MS_IN_HOUR); // Set start to 1 hour ago
return deepSet(
ioConfig,
'firehose.interval',
`${newStart.toISOString()}/${newEnd.toISOString()}`,
);
return knownGranularity ? queryGranularityType : queryGranularity;
}
export async function sampleForConnect(
spec: IngestionSpec,
sampleStrategy: SampleStrategy,
): Promise<SampleResponseWithExtraInfo> {
const samplerType = getSamplerType(spec);
const ioConfig: IoConfig = await scopeDownIngestSegmentFirehoseIntervalIfNeeded(
makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy),
const samplerType = getSpecType(spec);
let ioConfig: IoConfig = makeSamplerIoConfig(
deepGet(spec, 'ioConfig'),
samplerType,
sampleStrategy,
);
const ingestSegmentMode = isIngestSegment(spec);
const reingestMode = isDruidSource(spec);
if (!reingestMode) {
ioConfig = deepSet(ioConfig, 'inputFormat', {
type: 'regex',
pattern: '(.*)',
columns: ['raw'],
});
}
const sampleSpec: SampleSpec = {
type: samplerType,
@ -248,16 +265,8 @@ export async function sampleForConnect(
ioConfig,
dataSchema: {
dataSource: 'sample',
parser: {
type: 'string',
parseSpec: {
format: 'regex',
pattern: '(.*)',
columns: ['a'],
dimensionsSpec: {},
timestampSpec: getEmptyTimestampSpec(),
},
},
timestampSpec: getDummyTimestampSpec(),
dimensionsSpec: {},
},
} as any,
samplerConfig: BASE_SAMPLER_CONFIG,
@ -267,11 +276,11 @@ export async function sampleForConnect(
if (!samplerResponse.data.length) return samplerResponse;
if (ingestSegmentMode) {
if (reingestMode) {
const segmentMetadataResponse = await queryDruidRune({
queryType: 'segmentMetadata',
dataSource: deepGet(ioConfig, 'firehose.dataSource'),
intervals: [deepGet(ioConfig, 'firehose.interval')],
dataSource: deepGet(ioConfig, 'inputSource.dataSource'),
intervals: [deepGet(ioConfig, 'inputSource.interval')],
merge: true,
lenientAggregatorMerge: true,
analysisTypes: ['timestampSpec', 'queryGranularity', 'aggregators', 'rollup'],
@ -279,8 +288,9 @@ export async function sampleForConnect(
if (Array.isArray(segmentMetadataResponse) && segmentMetadataResponse.length === 1) {
const segmentMetadataResponse0 = segmentMetadataResponse[0];
samplerResponse.queryGranularity = segmentMetadataResponse0.queryGranularity;
samplerResponse.timestampSpec = segmentMetadataResponse0.timestampSpec;
samplerResponse.queryGranularity = cleanupQueryGranularity(
segmentMetadataResponse0.queryGranularity,
);
samplerResponse.rollup = segmentMetadataResponse0.rollup;
samplerResponse.columns = segmentMetadataResponse0.columns;
samplerResponse.aggregators = segmentMetadataResponse0.aggregators;
@ -295,35 +305,26 @@ export async function sampleForConnect(
export async function sampleForParser(
spec: IngestionSpec,
sampleStrategy: SampleStrategy,
cacheKey: string | undefined,
): Promise<SampleResponse> {
const samplerType = getSamplerType(spec);
const ioConfig: IoConfig = await scopeDownIngestSegmentFirehoseIntervalIfNeeded(
makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy),
const samplerType = getSpecType(spec);
const ioConfig: IoConfig = makeSamplerIoConfig(
deepGet(spec, 'ioConfig'),
samplerType,
sampleStrategy,
);
const parser: Parser = deepGet(spec, 'dataSchema.parser') || {};
const sampleSpec: SampleSpec = {
type: samplerType,
spec: {
type: samplerType,
ioConfig: deepSet(ioConfig, 'type', samplerType),
ioConfig,
dataSchema: {
dataSource: 'sample',
parser: {
type: parser.type,
parseSpec: (parser.parseSpec
? Object.assign({}, parser.parseSpec, {
dimensionsSpec: {},
timestampSpec: getEmptyTimestampSpec(),
})
: undefined) as any,
},
timestampSpec: getDummyTimestampSpec(),
dimensionsSpec: {},
},
},
samplerConfig: Object.assign({}, BASE_SAMPLER_CONFIG, {
cacheKey,
}),
samplerConfig: BASE_SAMPLER_CONFIG,
};
return postToSampler(sampleSpec, 'parser');
@ -331,17 +332,10 @@ export async function sampleForParser(
export async function sampleForTimestamp(
spec: IngestionSpec,
sampleStrategy: SampleStrategy,
cacheKey: string | undefined,
cacheRows: CacheRows,
): Promise<SampleResponse> {
const samplerType = getSamplerType(spec);
const ioConfig: IoConfig = await scopeDownIngestSegmentFirehoseIntervalIfNeeded(
makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy),
);
const parser: Parser = deepGet(spec, 'dataSchema.parser') || {};
const parseSpec: ParseSpec = deepGet(spec, 'dataSchema.parser.parseSpec') || {};
const timestampSpec: ParseSpec =
deepGet(spec, 'dataSchema.parser.parseSpec.timestampSpec') || getEmptyTimestampSpec();
const samplerType = getSpecType(spec);
const timestampSpec: TimestampSpec = deepGet(spec, 'dataSchema.timestampSpec');
const columnTimestampSpec = isColumnTimestampSpec(timestampSpec);
// First do a query with a static timestamp spec
@ -349,26 +343,20 @@ export async function sampleForTimestamp(
type: samplerType,
spec: {
type: samplerType,
ioConfig: deepSet(ioConfig, 'type', samplerType),
ioConfig: deepGet(spec, 'ioConfig'),
dataSchema: {
dataSource: 'sample',
parser: {
type: parser.type,
parseSpec: (parser.parseSpec
? Object.assign({}, parseSpec, {
dimensionsSpec: {},
timestampSpec: columnTimestampSpec ? getEmptyTimestampSpec() : timestampSpec,
})
: undefined) as any,
},
dimensionsSpec: {},
timestampSpec: columnTimestampSpec ? getDummyTimestampSpec() : timestampSpec,
},
},
samplerConfig: Object.assign({}, BASE_SAMPLER_CONFIG, {
cacheKey,
}),
samplerConfig: BASE_SAMPLER_CONFIG,
};
const sampleColumns = await postToSampler(sampleSpecColumns, 'timestamp-columns');
const sampleColumns = await postToSampler(
applyCache(sampleSpecColumns, cacheRows),
'timestamp-columns',
);
// If we are not parsing a column then there is nothing left to do
if (!columnTimestampSpec) return sampleColumns;
@ -379,28 +367,19 @@ export async function sampleForTimestamp(
type: samplerType,
spec: {
type: samplerType,
ioConfig: deepSet(ioConfig, 'type', samplerType),
ioConfig: deepGet(spec, 'ioConfig'),
dataSchema: {
dataSource: 'sample',
parser: {
type: parser.type,
parseSpec: Object.assign({}, parseSpec, {
dimensionsSpec: {},
}),
},
dimensionsSpec: {},
timestampSpec,
},
},
samplerConfig: Object.assign({}, BASE_SAMPLER_CONFIG, {
cacheKey: sampleColumns.cacheKey || cacheKey,
}),
samplerConfig: BASE_SAMPLER_CONFIG,
};
const sampleTime = await postToSampler(sampleSpec, 'timestamp-time');
const sampleTime = await postToSampler(applyCache(sampleSpec, cacheRows), 'timestamp-time');
if (
sampleTime.cacheKey !== sampleColumns.cacheKey ||
sampleTime.data.length !== sampleColumns.data.length
) {
if (sampleTime.data.length !== sampleColumns.data.length) {
// If the two responses did not come from the same cache (or for some reason have different lengths) then
// just return the one with the parsed time column.
return sampleTime;
@ -420,16 +399,11 @@ export async function sampleForTimestamp(
export async function sampleForTransform(
spec: IngestionSpec,
sampleStrategy: SampleStrategy,
cacheKey: string | undefined,
cacheRows: CacheRows,
): Promise<SampleResponse> {
const samplerType = getSamplerType(spec);
const ioConfig: IoConfig = await scopeDownIngestSegmentFirehoseIntervalIfNeeded(
makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy),
);
const parser: Parser = deepGet(spec, 'dataSchema.parser') || {};
const parseSpec: ParseSpec = deepGet(spec, 'dataSchema.parser.parseSpec') || {};
const parserColumns: string[] = deepGet(parseSpec, 'columns') || [];
const samplerType = getSpecType(spec);
const inputFormatColumns: string[] = deepGet(spec, 'ioConfig.inputFormat.columns') || [];
const timestampSpec: TimestampSpec = deepGet(spec, 'dataSchema.timestampSpec');
const transforms: Transform[] = deepGet(spec, 'dataSchema.transformSpec.transforms') || [];
// Extra step to simulate auto detecting dimension with transforms
@ -439,29 +413,26 @@ export async function sampleForTransform(
type: samplerType,
spec: {
type: samplerType,
ioConfig: deepSet(ioConfig, 'type', samplerType),
ioConfig: deepGet(spec, 'ioConfig'),
dataSchema: {
dataSource: 'sample',
parser: {
type: parser.type,
parseSpec: Object.assign({}, parseSpec, {
dimensionsSpec: {},
}),
},
timestampSpec,
dimensionsSpec: {},
},
},
samplerConfig: Object.assign({}, BASE_SAMPLER_CONFIG, {
cacheKey,
}),
samplerConfig: BASE_SAMPLER_CONFIG,
};
const sampleResponseHack = await postToSampler(sampleSpecHack, 'transform-pre');
const sampleResponseHack = await postToSampler(
applyCache(sampleSpecHack, cacheRows),
'transform-pre',
);
specialDimensionSpec.dimensions = dedupe(
headerFromSampleResponse(
sampleResponseHack,
'__time',
['__time'].concat(parserColumns),
['__time'].concat(inputFormatColumns),
).concat(transforms.map(t => t.name)),
);
}
@ -470,40 +441,29 @@ export async function sampleForTransform(
type: samplerType,
spec: {
type: samplerType,
ioConfig: deepSet(ioConfig, 'type', samplerType),
ioConfig: deepGet(spec, 'ioConfig'),
dataSchema: {
dataSource: 'sample',
parser: {
type: parser.type,
parseSpec: Object.assign({}, parseSpec, {
dimensionsSpec: specialDimensionSpec, // Hack Hack Hack
}),
},
timestampSpec,
dimensionsSpec: specialDimensionSpec, // Hack Hack Hack
transformSpec: {
transforms,
},
},
},
samplerConfig: Object.assign({}, BASE_SAMPLER_CONFIG, {
cacheKey,
}),
samplerConfig: BASE_SAMPLER_CONFIG,
};
return postToSampler(sampleSpec, 'transform');
return postToSampler(applyCache(sampleSpec, cacheRows), 'transform');
}
export async function sampleForFilter(
spec: IngestionSpec,
sampleStrategy: SampleStrategy,
cacheKey: string | undefined,
cacheRows: CacheRows,
): Promise<SampleResponse> {
const samplerType = getSamplerType(spec);
const ioConfig: IoConfig = await scopeDownIngestSegmentFirehoseIntervalIfNeeded(
makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy),
);
const parser: Parser = deepGet(spec, 'dataSchema.parser') || {};
const parseSpec: ParseSpec = deepGet(spec, 'dataSchema.parser.parseSpec') || {};
const parserColumns: string[] = deepGet(parser, 'columns') || [];
const samplerType = getSpecType(spec);
const inputFormatColumns: string[] = deepGet(spec, 'ioConfig.inputFormat.columns') || [];
const timestampSpec: TimestampSpec = deepGet(spec, 'dataSchema.timestampSpec');
const transforms: Transform[] = deepGet(spec, 'dataSchema.transformSpec.transforms') || [];
const filter: any = deepGet(spec, 'dataSchema.transformSpec.filter');
@ -514,29 +474,26 @@ export async function sampleForFilter(
type: samplerType,
spec: {
type: samplerType,
ioConfig: deepSet(ioConfig, 'type', samplerType),
ioConfig: deepGet(spec, 'ioConfig'),
dataSchema: {
dataSource: 'sample',
parser: {
type: parser.type,
parseSpec: Object.assign({}, parseSpec, {
dimensionsSpec: {},
}),
},
timestampSpec,
dimensionsSpec: {},
},
},
samplerConfig: Object.assign({}, BASE_SAMPLER_CONFIG, {
cacheKey,
}),
samplerConfig: BASE_SAMPLER_CONFIG,
};
const sampleResponseHack = await postToSampler(sampleSpecHack, 'filter-pre');
const sampleResponseHack = await postToSampler(
applyCache(sampleSpecHack, cacheRows),
'filter-pre',
);
specialDimensionSpec.dimensions = dedupe(
headerFromSampleResponse(
sampleResponseHack,
'__time',
['__time'].concat(parserColumns),
['__time'].concat(inputFormatColumns),
).concat(transforms.map(t => t.name)),
);
}
@ -545,41 +502,32 @@ export async function sampleForFilter(
type: samplerType,
spec: {
type: samplerType,
ioConfig: deepSet(ioConfig, 'type', samplerType),
ioConfig: deepGet(spec, 'ioConfig'),
dataSchema: {
dataSource: 'sample',
parser: {
type: parser.type,
parseSpec: Object.assign({}, parseSpec, {
dimensionsSpec: specialDimensionSpec, // Hack Hack Hack
}),
},
timestampSpec,
dimensionsSpec: specialDimensionSpec, // Hack Hack Hack
transformSpec: {
transforms,
filter,
},
},
},
samplerConfig: Object.assign({}, BASE_SAMPLER_CONFIG, {
cacheKey,
}),
samplerConfig: BASE_SAMPLER_CONFIG,
};
return postToSampler(sampleSpec, 'filter');
return postToSampler(applyCache(sampleSpec, cacheRows), 'filter');
}
export async function sampleForSchema(
spec: IngestionSpec,
sampleStrategy: SampleStrategy,
cacheKey: string | undefined,
cacheRows: CacheRows,
): Promise<SampleResponse> {
const samplerType = getSamplerType(spec);
const ioConfig: IoConfig = await scopeDownIngestSegmentFirehoseIntervalIfNeeded(
makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy),
);
const parser: Parser = deepGet(spec, 'dataSchema.parser') || {};
const samplerType = getSpecType(spec);
const timestampSpec: TimestampSpec = deepGet(spec, 'dataSchema.timestampSpec');
const transformSpec: TransformSpec =
deepGet(spec, 'dataSchema.transformSpec') || ({} as TransformSpec);
const dimensionsSpec: DimensionsSpec = deepGet(spec, 'dataSchema.dimensionsSpec');
const metricsSpec: MetricSpec[] = deepGet(spec, 'dataSchema.metricsSpec') || [];
const queryGranularity: string =
deepGet(spec, 'dataSchema.granularitySpec.queryGranularity') || 'NONE';
@ -588,56 +536,49 @@ export async function sampleForSchema(
type: samplerType,
spec: {
type: samplerType,
ioConfig: deepSet(ioConfig, 'type', samplerType),
ioConfig: deepGet(spec, 'ioConfig'),
dataSchema: {
dataSource: 'sample',
parser: whitelistKeys(parser, ['type', 'parseSpec']) as Parser,
timestampSpec,
transformSpec,
metricsSpec,
granularitySpec: {
queryGranularity,
},
dimensionsSpec,
metricsSpec,
},
},
samplerConfig: Object.assign({}, BASE_SAMPLER_CONFIG, {
cacheKey,
}),
samplerConfig: BASE_SAMPLER_CONFIG,
};
return postToSampler(sampleSpec, 'schema');
return postToSampler(applyCache(sampleSpec, cacheRows), 'schema');
}
export async function sampleForExampleManifests(
exampleManifestUrl: string,
): Promise<ExampleManifest[]> {
const sampleSpec: SampleSpec = {
type: 'index',
const exampleSpec: SampleSpec = {
type: 'index_parallel',
spec: {
type: 'index',
type: 'index_parallel',
ioConfig: {
type: 'index',
firehose: { type: 'http', uris: [exampleManifestUrl] },
type: 'index_parallel',
inputSource: { type: 'http', uris: [exampleManifestUrl] },
inputFormat: { type: 'tsv', findColumnsFromHeader: true },
},
dataSchema: {
dataSource: 'sample',
parser: {
type: 'string',
parseSpec: {
format: 'tsv',
timestampSpec: {
column: 'timestamp',
missingValue: '2010-01-01T00:00:00Z',
},
dimensionsSpec: {},
hasHeaderRow: true,
},
timestampSpec: {
column: 'timestamp',
missingValue: '2010-01-01T00:00:00Z',
},
dimensionsSpec: {},
},
},
samplerConfig: { numRows: 50, timeoutMs: 10000, skipCache: true },
samplerConfig: { numRows: 50, timeoutMs: 10000 },
};
const exampleData = await postToSampler(sampleSpec, 'example-manifest');
const exampleData = await postToSampler(exampleSpec, 'example-manifest');
return filterMap(exampleData.data, datum => {
const parsed = datum.parsed;
@ -658,7 +599,7 @@ export async function sampleForExampleManifests(
return {
name: parsed.name,
description: parsed.description,
spec,
spec: upgradeSpec(spec),
};
} else {
return;

View File

@ -26,10 +26,10 @@ export function computeFlattenPathsForData(
exprType: ExprType,
arrayHandling: ArrayHandling,
): FlattenField[] {
return computeFlattenExprsForData(data, exprType, arrayHandling).map((expr, i) => {
return computeFlattenExprsForData(data, exprType, arrayHandling).map(expr => {
return {
name: expr.replace(/^\$?\./, ''),
type: exprType,
name: `expr_${i}`,
expr,
};
});

View File

@ -25,27 +25,20 @@ import {
updateSchemaWithSample,
} from './druid-type';
import { IngestionSpec } from './ingestion-spec';
import {
getSamplerType,
headerFromSampleResponse,
sampleForConnect,
sampleForExampleManifests,
sampleForFilter,
sampleForParser,
sampleForSchema,
sampleForTimestamp,
sampleForTransform,
} from './sampler';
import { applyCache, headerFromSampleResponse } from './sampler';
describe('test-utils', () => {
const ingestionSpec = {
const ingestionSpec: IngestionSpec = {
type: 'index_parallel',
ioConfig: {
type: 'index_parallel',
firehose: {
inputSource: {
type: 'http',
uris: ['https://static.imply.io/data/wikipedia.json.gz'],
},
inputFormat: {
type: 'json',
},
},
tuningConfig: {
type: 'index_parallel',
@ -57,71 +50,122 @@ describe('test-utils', () => {
segmentGranularity: 'DAY',
queryGranularity: 'HOUR',
},
parser: {
type: 'string',
parseSpec: {
format: 'json',
timestampSpec: {
column: 'timestamp',
format: 'iso',
},
dimensionsSpec: {},
},
timestampSpec: {
column: 'timestamp',
format: 'iso',
},
dimensionsSpec: {},
},
};
it('spec-utils getSamplerType', () => {
expect(getSamplerType(ingestionSpec as IngestionSpec)).toMatchInlineSnapshot(`"index"`);
});
// const cacheRows: CacheRows = [{ make: 'Honda', model: 'Civic' }, { make: 'BMW', model: 'M3' }];
it('spec-utils headerFromSampleResponse', () => {
expect(headerFromSampleResponse({ cacheKey: 'abc123', data: [] })).toMatchInlineSnapshot(
`Array []`,
);
expect(headerFromSampleResponse({ data: [{ input: { a: 1 }, parsed: { a: 1 } }] }))
.toMatchInlineSnapshot(`
Array [
"a",
]
`);
});
it('spec-utils sampleForParser', () => {
it('spec-utils applyCache', () => {
expect(
sampleForParser(ingestionSpec as IngestionSpec, 'start', 'abc123'),
).toMatchInlineSnapshot(`Promise {}`);
});
it('spec-utils SampleSpec', () => {
expect(sampleForConnect(ingestionSpec as IngestionSpec, 'start')).toMatchInlineSnapshot(
`Promise {}`,
);
});
it('spec-utils sampleForTimestamp', () => {
expect(
sampleForTimestamp(ingestionSpec as IngestionSpec, 'start', 'abc123'),
).toMatchInlineSnapshot(`Promise {}`);
});
it('spec-utils sampleForTransform', () => {
expect(
sampleForTransform(ingestionSpec as IngestionSpec, 'start', 'abc123'),
).toMatchInlineSnapshot(`Promise {}`);
});
it('spec-utils sampleForFilter', () => {
expect(
sampleForFilter(ingestionSpec as IngestionSpec, 'start', 'abc123'),
).toMatchInlineSnapshot(`Promise {}`);
});
it('spec-utils sampleForSchema', () => {
expect(
sampleForSchema(ingestionSpec as IngestionSpec, 'start', 'abc123'),
).toMatchInlineSnapshot(`Promise {}`);
});
it('spec-utils sampleForExampleManifests', () => {
expect(sampleForExampleManifests('abc123')).toMatchInlineSnapshot(`Promise {}`);
applyCache(
{
type: 'index_parallel',
spec: ingestionSpec,
samplerConfig: {
numRows: 500,
timeoutMs: 15000,
},
},
[{ make: 'Honda', model: 'Accord' }, { make: 'Toyota', model: 'Prius' }],
),
).toMatchInlineSnapshot(`
Object {
"samplerConfig": Object {
"numRows": 500,
"timeoutMs": 15000,
},
"spec": Object {
"dataSchema": Object {
"dataSource": "wikipedia",
"dimensionsSpec": Object {},
"granularitySpec": Object {
"queryGranularity": "HOUR",
"segmentGranularity": "DAY",
"type": "uniform",
},
"timestampSpec": Object {
"column": "timestamp",
"format": "iso",
},
},
"ioConfig": Object {
"inputFormat": Object {
"type": "json",
},
"inputSource": Object {
"data": "{\\"make\\":\\"Honda\\",\\"model\\":\\"Accord\\"}
{\\"make\\":\\"Toyota\\",\\"model\\":\\"Prius\\"}",
"type": "inline",
},
"type": "index",
},
"tuningConfig": Object {
"type": "index_parallel",
},
"type": "index",
},
"type": "index",
}
`);
});
// it('spec-utils sampleForParser', async () => {
// expect(await sampleForParser(ingestionSpec, 'start', 'abc123')).toMatchInlineSnapshot(
// `Promise {}`,
// );
// });
//
// it('spec-utils SampleSpec', async () => {
// expect(await sampleForConnect(ingestionSpec, 'start')).toMatchInlineSnapshot(`Promise {}`);
// });
//
// it('spec-utils sampleForTimestamp', async () => {
// expect(await sampleForTimestamp(ingestionSpec, 'start', cacheRows)).toMatchInlineSnapshot();
// });
//
// it('spec-utils sampleForTransform', async () => {
// expect(await sampleForTransform(ingestionSpec, 'start', cacheRows)).toMatchInlineSnapshot();
// });
//
// it('spec-utils sampleForFilter', async () => {
// expect(await sampleForFilter(ingestionSpec, 'start', cacheRows)).toMatchInlineSnapshot();
// });
//
// it('spec-utils sampleForSchema', async () => {
// expect(await sampleForSchema(ingestionSpec, 'start', cacheRows)).toMatchInlineSnapshot();
// });
//
// it('spec-utils sampleForExampleManifests', async () => {
// expect(await sampleForExampleManifests('some url')).toMatchInlineSnapshot();
// });
});
describe('druid-type.ts', () => {
const ingestionSpec = {
const ingestionSpec: IngestionSpec = {
type: 'index_parallel',
ioConfig: {
type: 'index_parallel',
firehose: {
inputSource: {
type: 'http',
uris: ['https://static.imply.io/data/wikipedia.json.gz'],
},
inputFormat: {
type: 'json',
},
},
tuningConfig: {
type: 'index_parallel',
@ -133,27 +177,24 @@ describe('druid-type.ts', () => {
segmentGranularity: 'DAY',
queryGranularity: 'HOUR',
},
parser: {
type: 'string',
parseSpec: {
format: 'json',
timestampSpec: {
column: 'timestamp',
format: 'iso',
},
dimensionsSpec: {},
},
timestampSpec: {
column: 'timestamp',
format: 'iso',
},
dimensionsSpec: {},
},
};
it('spec-utils getSamplerType', () => {
it('spec-utils guessTypeFromSample', () => {
expect(guessTypeFromSample([])).toMatchInlineSnapshot(`"string"`);
});
it('spec-utils getColumnTypeFromHeaderAndRows', () => {
expect(
getColumnTypeFromHeaderAndRows({ header: ['header'], rows: [] }, 'header'),
).toMatchInlineSnapshot(`"string"`);
});
it('spec-utils getDimensionSpecs', () => {
expect(getDimensionSpecs({ header: ['header'], rows: [] }, true)).toMatchInlineSnapshot(`
Array [
@ -161,6 +202,7 @@ describe('druid-type.ts', () => {
]
`);
});
it('spec-utils getMetricSecs', () => {
expect(getMetricSecs({ header: ['header'], rows: [] })).toMatchInlineSnapshot(`
Array [
@ -171,18 +213,19 @@ describe('druid-type.ts', () => {
]
`);
});
it('spec-utils updateSchemaWithSample', () => {
expect(
updateSchemaWithSample(
ingestionSpec as IngestionSpec,
{ header: ['header'], rows: [] },
'specific',
true,
),
updateSchemaWithSample(ingestionSpec, { header: ['header'], rows: [] }, 'specific', true),
).toMatchInlineSnapshot(`
Object {
"dataSchema": Object {
"dataSource": "wikipedia",
"dimensionsSpec": Object {
"dimensions": Array [
"header",
],
},
"granularitySpec": Object {
"queryGranularity": "HOUR",
"rollup": true,
@ -195,24 +238,16 @@ describe('druid-type.ts', () => {
"type": "count",
},
],
"parser": Object {
"parseSpec": Object {
"dimensionsSpec": Object {
"dimensions": Array [
"header",
],
},
"format": "json",
"timestampSpec": Object {
"column": "timestamp",
"format": "iso",
},
},
"type": "string",
"timestampSpec": Object {
"column": "timestamp",
"format": "iso",
},
},
"ioConfig": Object {
"firehose": Object {
"inputFormat": Object {
"type": "json",
},
"inputSource": Object {
"type": "http",
"uris": Array [
"https://static.imply.io/data/wikipedia.json.gz",
@ -232,9 +267,11 @@ describe('druid-query.ts', () => {
it('spec-utils parseHtmlError', () => {
expect(parseHtmlError('<div></div>')).toMatchInlineSnapshot(`undefined`);
});
it('spec-utils parseHtmlError', () => {
expect(getDruidErrorMessage({})).toMatchInlineSnapshot(`undefined`);
});
it('spec-utils parseQueryPlan', () => {
expect(parseQueryPlan('start')).toMatchInlineSnapshot(`"start"`);
});

View File

@ -514,7 +514,7 @@ exports[`tasks view matches snapshot 1`] = `
filtered={
Array [
Object {
"id": "task_id",
"id": "group_id",
"value": "test",
},
Object {

View File

@ -28,7 +28,7 @@ describe('tasks view', () => {
const taskView = shallow(
<IngestionView
openDialog={'test'}
taskId={'test'}
taskGroupId={'test'}
datasourceId={'datasource'}
goToDatasource={() => {}}
goToQuery={() => {}}

View File

@ -102,7 +102,7 @@ interface TaskQueryResultRow {
}
export interface IngestionViewProps {
taskId: string | undefined;
taskGroupId: string | undefined;
datasourceId: string | undefined;
openDialog: string | undefined;
goToDatasource: (datasource: string) => void;
@ -216,7 +216,7 @@ ORDER BY "rank" DESC, "created_time" DESC`;
super(props, context);
const taskFilter: Filter[] = [];
if (props.taskId) taskFilter.push({ id: 'task_id', value: props.taskId });
if (props.taskGroupId) taskFilter.push({ id: 'group_id', value: props.taskGroupId });
if (props.datasourceId) taskFilter.push({ id: 'datasource', value: props.datasourceId });
const supervisorFilter: Filter[] = [];

View File

@ -59,12 +59,12 @@ exports[`load data view matches snapshot 1`] = `
</div>
<div
className="step-section"
key="Transform and configure schema"
key="Transform data and configure schema"
>
<div
className="step-nav-l1"
>
Transform and configure schema
Transform data and configure schema
</div>
<Blueprint3.ButtonGroup
className="step-nav-l2"

View File

@ -27,7 +27,7 @@ describe('filter table', () => {
header: ['c1'],
rows: [
{
raw: `{"c1":"hello"}`,
input: { c1: 'hello' },
parsed: { c1: 'hello' },
},
],

View File

@ -23,7 +23,7 @@ import { LoadDataView } from './load-data-view';
describe('load data view', () => {
it('matches snapshot', () => {
const loadDataView = shallow(<LoadDataView goToTask={() => {}} />);
const loadDataView = shallow(<LoadDataView goToIngestion={() => {}} />);
expect(loadDataView).toMatchSnapshot();
});
});

File diff suppressed because it is too large Load Diff

View File

@ -27,7 +27,7 @@ describe('parse data table', () => {
header: ['c1'],
rows: [
{
raw: `{"c1":"hello"}`,
input: { c1: 'hello' },
parsed: { c1: 'hello' },
},
],

View File

@ -22,7 +22,7 @@ import ReactTable from 'react-table';
import { TableCell } from '../../../components';
import { TableCellUnparseable } from '../../../components/table-cell-unparseable/table-cell-unparseable';
import { caseInsensitiveContains, filterMap, parseJson } from '../../../utils';
import { caseInsensitiveContains, filterMap } from '../../../utils';
import { FlattenField } from '../../../utils/ingestion-spec';
import { HeaderAndRows, SampleEntry } from '../../../utils/sampler';
@ -85,20 +85,16 @@ export const ParseDataTable = React.memo(function ParseDataTable(props: ParseDat
};
})}
SubComponent={rowInfo => {
const { raw, error } = rowInfo.original;
const parsedJson: any = parseJson(raw);
const { input, error } = rowInfo.original;
const inputStr = JSON.stringify(input, null, 2);
if (!error && parsedJson && canFlatten) {
return (
<pre className="parse-detail">
{'Original row: ' + JSON.stringify(parsedJson, null, 2)}
</pre>
);
if (!error && input && canFlatten) {
return <pre className="parse-detail">{'Original row: ' + inputStr}</pre>;
} else {
return (
<div className="parse-detail">
{error && <div className="parse-error">{error}</div>}
<div>{'Original row: ' + rowInfo.original.raw}</div>
<div>{'Original row: ' + inputStr}</div>
</div>
);
}

View File

@ -19,7 +19,7 @@
import { render } from '@testing-library/react';
import React from 'react';
import { getEmptyTimestampSpec } from '../../../utils/ingestion-spec';
import { getDummyTimestampSpec } from '../../../utils/ingestion-spec';
import { ParseTimeTable } from './parse-time-table';
@ -29,7 +29,7 @@ describe('parse time table', () => {
header: ['c1'],
rows: [
{
raw: `{"c1":"hello"}`,
input: { c1: 'hello' },
parsed: { c1: 'hello' },
},
],
@ -39,7 +39,7 @@ describe('parse time table', () => {
<ParseTimeTable
sampleBundle={{
headerAndRows: sampleData,
timestampSpec: getEmptyTimestampSpec(),
timestampSpec: getDummyTimestampSpec(),
}}
columnFilter=""
possibleTimestampColumnsOnly={false}

View File

@ -27,7 +27,7 @@ describe('schema table', () => {
header: ['c1'],
rows: [
{
raw: `{"c1":"hello"}`,
input: { c1: 'hello' },
parsed: { c1: 'hello' },
},
],

View File

@ -27,7 +27,7 @@ describe('transform table', () => {
header: ['c1'],
rows: [
{
raw: `{"c1":"hello"}`,
input: { c1: 'hello' },
parsed: { c1: 'hello' },
},
],

View File

@ -347,7 +347,7 @@ export class QueryView extends React.PureComponent<QueryViewProps, QueryViewStat
prettyPrintJson(): void {
this.setState(prevState => ({
queryString: Hjson.stringify(Hjson.parse(prevState.queryString)),
queryString: JSON.stringify(Hjson.parse(prevState.queryString), null, 2),
}));
}

View File

@ -142,7 +142,7 @@ export class RunButton extends React.PureComponent<RunButtonProps> {
/>
)}
{runeMode && (
<MenuItem icon={IconNames.PRINT} text="Pretty print JSON" onClick={onPrettier} />
<MenuItem icon={IconNames.ALIGN_LEFT} text="Prettify JSON" onClick={onPrettier} />
)}
</Menu>
);