Web console, adding Apache Kafka and AWS Kinesis to the data loader (#7643)

* adding kafka and kinesis to the data loader

* feature detect

* copy fixes

* wording fixes

* added missing spec type

* increase timeout

* Call it Google Cloud Storage
This commit is contained in:
Vadim Ogievetsky 2019-05-17 14:01:27 -07:00 committed by Clint Wylie
parent 94721de141
commit be16e4a4ae
14 changed files with 850 additions and 231 deletions

View File

@ -17,7 +17,7 @@
*/
import { InputGroup, ITagInputProps } from '@blueprintjs/core';
import { ITagInputProps, TextArea } from '@blueprintjs/core';
import * as React from 'react';
export interface ArrayInputProps extends ITagInputProps {
@ -46,13 +46,14 @@ export class ArrayInput extends React.Component<ArrayInputProps, { stringValue:
render() {
const { className, placeholder, large, disabled } = this.props;
const { stringValue } = this.state;
return <InputGroup
return <TextArea
className={className}
value={stringValue}
onChange={this.handleChange}
placeholder={placeholder}
large={large}
disabled={disabled}
fill
/>;
}
}

View File

@ -47,7 +47,7 @@ export interface Field<T> {
name: string;
label?: string;
info?: React.ReactNode;
type: 'number' | 'size-bytes' | 'string' | 'boolean' | 'string-array' | 'json';
type: 'number' | 'size-bytes' | 'string' | 'duration' | 'boolean' | 'string-array' | 'json';
defaultValue?: any;
isDefined?: (model: T) => boolean;
disabled?: boolean;
@ -140,7 +140,7 @@ export class AutoForm<T extends Record<string, any>> extends React.Component<Aut
/>;
}
private renderStringInput(field: Field<T>): JSX.Element {
private renderStringInput(field: Field<T>, sanitize?: (str: string) => string): JSX.Element {
const { model, large } = this.props;
const suggestionsMenu = field.suggestions ?
@ -178,7 +178,7 @@ export class AutoForm<T extends Record<string, any>> extends React.Component<Aut
value={deepGet(model as any, field.name) || field.defaultValue || ''}
onChange={(e: any) => {
const v = e.target.value;
this.fieldChange(field, v === '' ? undefined : v);
this.fieldChange(field, v === '' ? undefined : (sanitize ? sanitize(v) : v));
}}
placeholder={field.placeholder}
rightElement={
@ -252,6 +252,7 @@ export class AutoForm<T extends Record<string, any>> extends React.Component<Aut
case 'number': return this.renderNumberInput(field);
case 'size-bytes': return this.renderSizeBytesInput(field);
case 'string': return this.renderStringInput(field);
case 'duration': return this.renderStringInput(field, (str: string) => str.toUpperCase().replace(/[^0-9PYMDTHS.,]/g, ''));
case 'boolean': return this.renderBooleanInput(field);
case 'string-array': return this.renderStringArrayInput(field);
case 'json': return this.renderJSONInput(field);

View File

@ -44,7 +44,6 @@ import {
LEGACY_COORDINATOR_CONSOLE,
LEGACY_OVERLORD_CONSOLE
} from '../variables';
import { LoadDataViewSeed } from '../views/load-data-view';
import './header-bar.scss';
@ -53,7 +52,7 @@ export type HeaderActiveTab = null | 'load-data' | 'query' | 'datasources' | 'se
export interface HeaderBarProps extends React.Props<any> {
active: HeaderActiveTab;
hideLegacy: boolean;
goToLoadDataView: (loadDataViewSeed: LoadDataViewSeed) => void;
goToLoadDataView: () => void;
}
export interface HeaderBarState {
@ -160,15 +159,15 @@ export class HeaderBar extends React.Component<HeaderBarProps, HeaderBarState> {
minimal={!loadDataPrimary}
intent={loadDataPrimary ? Intent.PRIMARY : Intent.NONE}
/>
<AnchorButton minimal active={active === 'query'} icon={IconNames.APPLICATION} text="Query" href="#query" />
<NavbarDivider/>
<AnchorButton minimal active={active === 'datasources'} icon={IconNames.MULTI_SELECT} text="Datasources" href="#datasources" />
<AnchorButton minimal active={active === 'segments'} icon={IconNames.STACKED_CHART} text="Segments" href="#segments" />
<AnchorButton minimal active={active === 'tasks'} icon={IconNames.GANTT_CHART} text="Tasks" href="#tasks" />
<AnchorButton minimal active={active === 'servers'} icon={IconNames.DATABASE} text="Data servers" href="#servers" />
<NavbarDivider/>
<AnchorButton minimal active={active === 'servers'} icon={IconNames.DATABASE} text="Data servers" href="#servers" />
<AnchorButton minimal active={active === 'query'} icon={IconNames.APPLICATION} text="Query" href="#query" />
</NavbarGroup>
<NavbarGroup align={Alignment.RIGHT}>

View File

@ -32,7 +32,7 @@ import { QueryManager } from './utils';
import { DRUID_DOCS_API, DRUID_DOCS_SQL } from './variables';
import { DatasourcesView } from './views/datasource-view';
import { HomeView } from './views/home-view';
import { LoadDataView, LoadDataViewSeed } from './views/load-data-view';
import { LoadDataView } from './views/load-data-view';
import { LookupsView } from './views/lookups-view';
import { SegmentsView } from './views/segments-view';
import { ServersView } from './views/servers-view';
@ -100,8 +100,9 @@ export class ConsoleApplication extends React.Component<ConsoleApplicationProps,
});
}
private loadDataViewSeed: LoadDataViewSeed | null;
private initSpec: any | null;
private taskId: string | null;
private openDialog: string | null;
private datasource: string | null;
private onlyUnavailable: boolean | null;
private initSql: string | null;
@ -150,8 +151,9 @@ export class ConsoleApplication extends React.Component<ConsoleApplicationProps,
private resetInitialsWithDelay() {
setTimeout(() => {
this.loadDataViewSeed = null;
this.initSpec = null;
this.taskId = null;
this.openDialog = null;
this.datasource = null;
this.onlyUnavailable = null;
this.initSql = null;
@ -159,14 +161,15 @@ export class ConsoleApplication extends React.Component<ConsoleApplicationProps,
}, 50);
}
private goToLoadDataView = (loadDataViewSeed?: LoadDataViewSeed) => {
if (loadDataViewSeed) this.loadDataViewSeed = loadDataViewSeed;
private goToLoadDataView = (initSpec?: any) => {
if (initSpec) this.initSpec = initSpec;
window.location.hash = 'load-data';
this.resetInitialsWithDelay();
}
private goToTask = (taskId: string | null) => {
private goToTask = (taskId: string | null, openDialog?: string) => {
this.taskId = taskId;
if (openDialog) this.openDialog = openDialog;
window.location.hash = 'tasks';
this.resetInitialsWithDelay();
}
@ -205,7 +208,7 @@ export class ConsoleApplication extends React.Component<ConsoleApplicationProps,
}
private wrappedLoadDataView = () => {
return this.wrapInViewContainer('load-data', <LoadDataView seed={this.loadDataViewSeed} goToTask={this.goToTask}/>, 'narrow-pad');
return this.wrapInViewContainer('load-data', <LoadDataView initSpec={this.initSpec} goToTask={this.goToTask}/>, 'narrow-pad');
}
private wrappedSqlView = () => {
@ -224,7 +227,7 @@ export class ConsoleApplication extends React.Component<ConsoleApplicationProps,
private wrappedTasksView = () => {
const { noSqlMode } = this.state;
return this.wrapInViewContainer('tasks', <TasksView taskId={this.taskId} goToSql={this.goToSql} goToMiddleManager={this.goToMiddleManager} goToLoadDataView={this.goToLoadDataView} noSqlMode={noSqlMode}/>, 'scrollable');
return this.wrapInViewContainer('tasks', <TasksView taskId={this.taskId} openDialog={this.openDialog} goToSql={this.goToSql} goToMiddleManager={this.goToMiddleManager} goToLoadDataView={this.goToLoadDataView} noSqlMode={noSqlMode}/>, 'scrollable');
}
private wrappedServersView = () => {

View File

@ -27,6 +27,7 @@ export function parseHtmlError(htmlStr: string): string | null {
return htmlStr
.substring(startIndex + 10, endIndex)
.replace(/&quot;/g, '"')
.replace(/&apos;/g, `'`)
.replace(/&gt;/g, '>');
}

View File

@ -207,6 +207,10 @@ export function parseStringToJSON(s: string): JSON | null {
}
}
export function selectDefined<T, Q>(xs: (Q | null | undefined)[]): Q[] {
return xs.filter(Boolean) as any;
}
export function filterMap<T, Q>(xs: T[], f: (x: T, i?: number) => Q | null | undefined): Q[] {
return (xs.map(f) as any).filter(Boolean);
}

View File

@ -113,6 +113,7 @@ export interface ParseSpec {
}
export function hasParallelAbility(spec: IngestionSpec): boolean {
const specType = getSpecType(spec);
return spec.type === 'index' || spec.type === 'index_parallel';
}
@ -132,6 +133,10 @@ export function getRollup(spec: IngestionSpec): boolean {
return typeof specRollup === 'boolean' ? specRollup : true;
}
export function getSpecType(spec: IngestionSpec): IngestionType | undefined {
return deepGet(spec, 'type') || deepGet(spec, 'ioConfig.type') || deepGet(spec, 'tuningConfig.type');
}
export function changeParallel(spec: IngestionSpec, parallel: boolean): IngestionSpec {
if (!hasParallelAbility(spec)) return spec;
const newType = parallel ? 'index_parallel' : 'index';
@ -561,7 +566,7 @@ export interface IoConfig {
period?: string;
useEarliestOffset?: boolean;
stream?: string;
region?: string;
endpoint?: string;
useEarliestSequenceNumber?: boolean;
}
@ -597,7 +602,7 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F
name: 'firehose.uris',
label: 'URIs',
type: 'string-array',
placeholder: 'https://example.com/path/to/file.ext',
placeholder: 'https://example.com/path/to/file1.ext, https://example.com/path/to/file2.ext',
info: <>
<p>The full URI of your file. To ingest from multiple URIs, use commas to separate each individual URI.</p>
</>
@ -636,7 +641,7 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F
name: 'firehose.uris',
label: 'S3 URIs',
type: 'string-array',
placeholder: 's3://your-bucket/some-file.extension',
placeholder: 's3://your-bucket/some-file1.ext, s3://your-bucket/some-file2.ext',
isDefined: (ioConfig) => !deepGet(ioConfig, 'firehose.prefixes'),
info: <>
<p>The full S3 URI of your file. To ingest from multiple URIs, use commas to separate each individual URI.</p>
@ -647,7 +652,7 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F
name: 'firehose.prefixes',
label: 'S3 prefixes',
type: 'string-array',
placeholder: 's3://your-bucket/some-path',
placeholder: 's3://your-bucket/some-path1, s3://your-bucket/some-path2',
isDefined: (ioConfig) => !deepGet(ioConfig, 'firehose.uris'),
info: <>
<p>A list of paths (with bucket) where your files are stored.</p>
@ -700,22 +705,62 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F
return [
{
name: 'stream',
type: 'string'
type: 'string',
placeholder: 'your-kinesis-stream',
info: <>
The Kinesis stream to read.
</>
},
{
name: 'region',
type: 'string'
name: 'endpoint',
type: 'string',
defaultValue: 'kinesis.us-east-1.amazonaws.com',
suggestions: [
'kinesis.us-east-2.amazonaws.com',
'kinesis.us-east-1.amazonaws.com',
'kinesis.us-west-1.amazonaws.com',
'kinesis.us-west-2.amazonaws.com',
'kinesis.ap-east-1.amazonaws.com',
'kinesis.ap-south-1.amazonaws.com',
'kinesis.ap-northeast-3.amazonaws.com',
'kinesis.ap-northeast-2.amazonaws.com',
'kinesis.ap-southeast-1.amazonaws.com',
'kinesis.ap-southeast-2.amazonaws.com',
'kinesis.ap-northeast-1.amazonaws.com',
'kinesis.ca-central-1.amazonaws.com',
'kinesis.cn-north-1.amazonaws.com.com',
'kinesis.cn-northwest-1.amazonaws.com.com',
'kinesis.eu-central-1.amazonaws.com',
'kinesis.eu-west-1.amazonaws.com',
'kinesis.eu-west-2.amazonaws.com',
'kinesis.eu-west-3.amazonaws.com',
'kinesis.eu-north-1.amazonaws.com',
'kinesis.sa-east-1.amazonaws.com',
'kinesis.us-gov-east-1.amazonaws.com',
'kinesis.us-gov-west-1.amazonaws.com'
],
info: <>
The AWS Kinesis stream endpoint for a region.
You can find a list of endpoints <ExternalLink href="http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region">here</ExternalLink>.
</>
},
{
name: 'useEarliestOffset',
type: 'boolean',
defaultValue: true,
isDefined: (i: IoConfig) => i.type === 'kafka' || i.type === 'kinesis'
name: 'awsAssumedRoleArn',
label: 'AWS assumed role ARN',
type: 'string',
placeholder: 'optional',
info: <>
The AWS assumed role to use for additional permissions.
</>
},
{
name: 'useEarliestSequenceNumber',
type: 'boolean',
isDefined: (i: IoConfig) => i.type === 'kinesis'
name: 'awsExternalId',
label: 'AWS external ID',
type: 'string',
placeholder: 'optional',
info: <>
The AWS external id to use for additional permissions.
</>
}
];
}
@ -765,7 +810,7 @@ export function issueWithIoConfig(ioConfig: IoConfig | undefined): string | null
break;
case 'kinesis':
// if (!ioConfig.stream) return "must have a stream";
if (!ioConfig.stream) return 'must have a stream';
break;
}
@ -830,17 +875,174 @@ export function getIoConfigTuningFormFields(ingestionComboType: IngestionComboTy
return [];
case 'kafka':
return [
// ToDo: fill this in
];
case 'kinesis':
return [
// ToDo: fill this in
{
name: 'useEarliestOffset',
type: 'boolean',
defaultValue: false,
isDefined: (i: IoConfig) => i.type === 'kafka',
info: <>
<p>
If a supervisor is managing a dataSource for the first time, it will obtain a set of starting offsets from Kafka.
This flag determines whether it retrieves the earliest or latest offsets in Kafka.
Under normal circumstances, subsequent tasks will start from where the previous segments ended so this flag will only be used on first run.
</p>
</>
},
{
name: 'skipOffsetGaps',
type: 'boolean',
defaultValue: false,
isDefined: (i: IoConfig) => i.type === 'kafka',
info: <>
<p>
Whether or not to allow gaps of missing offsets in the Kafka stream.
This is required for compatibility with implementations such as MapR Streams which does not guarantee consecutive offsets.
If this is false, an exception will be thrown if offsets are not consecutive.
</p>
</>
},
{
name: 'pollTimeout',
type: 'number',
defaultValue: 100,
isDefined: (i: IoConfig) => i.type === 'kafka',
info: <>
<p>The length of time to wait for the kafka consumer to poll records, in milliseconds.</p>
</>
},
{
name: 'useEarliestSequenceNumber',
type: 'boolean',
defaultValue: false,
isDefined: (i: IoConfig) => i.type === 'kinesis',
info: <>
If a supervisor is managing a dataSource for the first time, it will obtain a set of starting sequence numbers from Kinesis.
This flag determines whether it retrieves the earliest or latest sequence numbers in Kinesis.
Under normal circumstances, subsequent tasks will start from where the previous segments ended so this flag will only be used on first run.
</>
},
{
name: 'recordsPerFetch',
type: 'number',
defaultValue: 2000,
isDefined: (i: IoConfig) => i.type === 'kinesis',
info: <>
The number of records to request per GetRecords call to Kinesis.
</>
},
{
name: 'fetchDelayMillis',
type: 'number',
defaultValue: 1000,
isDefined: (i: IoConfig) => i.type === 'kinesis',
info: <>
Time in milliseconds to wait between subsequent GetRecords calls to Kinesis.
</>
},
{
name: 'deaggregate',
type: 'boolean',
isDefined: (i: IoConfig) => i.type === 'kinesis',
info: <>
Whether to use the de-aggregate function of the KCL.
</>
},
{
name: 'replicas',
type: 'number',
defaultValue: 1,
info: <>
<p>The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against process failure.</p>
</>
},
{
name: 'taskCount',
type: 'number',
defaultValue: 1,
info: <>
<p>
The maximum number of reading tasks in a replica set.
This means that the maximum number of reading tasks will be <Code>taskCount * replicas</Code> and the total number of tasks (reading + publishing) will be higher than this. See 'Capacity Planning' below for more details.
</p>
</>
},
{
name: 'taskDuration',
type: 'duration',
defaultValue: 'PT1H',
info: <>
<p>
The length of time before tasks stop reading and begin publishing their segment.
</p>
</>
},
{
name: 'startDelay',
type: 'duration',
defaultValue: 'PT5S',
info: <>
<p>
The period to wait before the supervisor starts managing tasks.
</p>
</>
},
{
name: 'period',
type: 'duration',
defaultValue: 'PT30S',
info: <>
<p>
How often the supervisor will execute its management logic.
</p>
<p>
Note that the supervisor will also run in response to certain events (such as tasks succeeding, failing, and reaching their taskDuration) so this value specifies the maximum time between iterations.
</p>
</>
},
{
name: 'completionTimeout',
type: 'duration',
defaultValue: 'PT30M',
info: <>
<p>
The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish.
The publishing clock for a task begins roughly after taskDuration elapses.
</p>
</>
},
{
name: 'lateMessageRejectionPeriod',
type: 'string',
placeholder: '(none)',
info: <>
<p>
Configure tasks to reject messages with timestamps earlier than this period before the task was created;
for example if this is set to PT1H and the supervisor creates a task at 2016-01-01T12:00Z, messages with timestamps earlier than 2016-01-01T11:00Z will be dropped.
</p>
<p>
This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).
</p>
</>
},
{
name: 'earlyMessageRejectionPeriod',
type: 'string',
placeholder: '(none)',
info: <>
<p>
Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration;
for example if this is set to PT1H, the taskDuration is set to PT1H and the supervisor creates a task at 2016-01-01T12:00Z, messages with timestamps later than 2016-01-01T14:00Z will be dropped.
</p>
</>
}
];
}
throw new Error(`unknown input type ${ingestionComboType}`);
throw new Error(`unknown ingestion combo type ${ingestionComboType}`);
}
// ---------------------------------------
@ -906,9 +1108,116 @@ export interface TuningConfig {
reportParseExceptions?: boolean;
pushTimeout?: number;
segmentWriteOutMediumFactory?: any;
// ...
intermediateHandoffPeriod?: string;
handoffConditionTimeout?: number;
resetOffsetAutomatically?: boolean;
workerThreads?: number;
chatThreads?: number;
chatRetries?: number;
httpTimeout?: string;
shutdownTimeout?: string;
offsetFetchPeriod?: string;
maxParseExceptions?: number;
maxSavedParseExceptions?: number;
recordBufferSize?: number;
recordBufferOfferTimeout?: number;
recordBufferFullWait?: number;
fetchSequenceNumberTimeout?: number;
fetchThreads?: number;
}
export function getPartitionRelatedTuningSpecFormFields(specType: IngestionType): Field<TuningConfig>[] {
switch (specType) {
case 'index':
case 'index_parallel':
const myIsParallel = specType === 'index_parallel';
return [
{
name: 'partitionDimensions',
type: 'string-array',
disabled: myIsParallel,
info: <>
<p>
Does not currently work with parallel ingestion
</p>
<p>
The dimensions to partition on.
Leave blank to select all dimensions. Only used with forceGuaranteedRollup = true, will be ignored otherwise.
</p>
</>
},
{
name: 'forceGuaranteedRollup',
type: 'boolean',
disabled: myIsParallel,
info: <>
<p>
Does not currently work with parallel ingestion
</p>
<p>
Forces guaranteeing the perfect rollup.
The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased.
If this is set to true, the index task will read the entire input data twice: one for finding the optimal number of partitions per time chunk and one for generating segments.
</p>
</>
},
{
name: 'targetPartitionSize',
type: 'number',
info: <>
Target number of rows to include in a partition, should be a number that targets segments of 500MB~1GB.
</>
},
{
name: 'numShards',
type: 'number',
info: <>
Directly specify the number of shards to create.
If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.
</>
},
{
name: 'maxRowsPerSegment',
type: 'number',
defaultValue: 5000000,
info: <>
Determines how many rows are in each segment.
</>
},
{
name: 'maxTotalRows',
type: 'number',
defaultValue: 20000000,
info: <>
Total number of rows in segments waiting for being pushed.
</>
}
];
case 'kafka':
case 'kinesis':
return [
{
name: 'maxRowsPerSegment',
type: 'number',
defaultValue: 5000000,
info: <>
Determines how many rows are in each segment.
</>
},
{
name: 'maxTotalRows',
type: 'number',
defaultValue: 20000000,
info: <>
Total number of rows in segments waiting for being pushed.
</>
}
];
}
throw new Error(`unknown spec type ${specType}`);
}
const TUNING_CONFIG_FORM_FIELDS: Field<TuningConfig>[] = [
@ -928,18 +1237,37 @@ const TUNING_CONFIG_FORM_FIELDS: Field<TuningConfig>[] = [
Used in determining when intermediate persists to disk should occur.
</>
},
{
name: 'intermediatePersistPeriod',
type: 'duration',
defaultValue: 'PT10M',
isDefined: (t: TuningConfig) => t.type === 'kafka' || t.type === 'kinesis',
info: <>
The period that determines the rate at which intermediate persists occur.
</>
},
{
name: 'intermediateHandoffPeriod',
type: 'duration',
defaultValue: 'P2147483647D',
isDefined: (t: TuningConfig) => t.type === 'kafka' || t.type === 'kinesis',
info: <>
How often the tasks should hand off segments.
Handoff will happen either if maxRowsPerSegment or maxTotalRows is hit or every intermediateHandoffPeriod, whichever happens earlier.
</>
},
{
name: 'maxPendingPersists',
type: 'number'
type: 'number',
info: <>
Maximum number of persists that can be pending but not started.
If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes.
</>
},
{
name: 'forceExtendableShardSpecs',
type: 'boolean'
},
{
name: 'reportParseExceptions',
type: 'boolean'
},
{
name: 'pushTimeout',
type: 'number',
@ -978,7 +1306,7 @@ const TUNING_CONFIG_FORM_FIELDS: Field<TuningConfig>[] = [
},
{
name: 'chatHandlerTimeout',
type: 'string',
type: 'duration',
defaultValue: 'PT10S',
info: <>
Timeout for reporting the pushed segments in worker tasks.
@ -991,6 +1319,136 @@ const TUNING_CONFIG_FORM_FIELDS: Field<TuningConfig>[] = [
info: <>
Retries for reporting the pushed segments in worker tasks.
</>
},
{
name: 'handoffConditionTimeout',
type: 'number',
defaultValue: 0,
isDefined: (t: TuningConfig) => t.type === 'kafka' || t.type === 'kinesis',
info: <>
Milliseconds to wait for segment handoff.
0 means to wait forever.
</>
},
{
name: 'resetOffsetAutomatically',
type: 'boolean',
defaultValue: false,
isDefined: (t: TuningConfig) => t.type === 'kafka' || t.type === 'kinesis',
info: <>
Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition.
</>
},
{
name: 'workerThreads',
type: 'number',
placeholder: 'min(10, taskCount)',
isDefined: (t: TuningConfig) => t.type === 'kafka' || t.type === 'kinesis',
info: <>
The number of threads that will be used by the supervisor for asynchronous operations.
</>
},
{
name: 'chatThreads',
type: 'number',
placeholder: 'min(10, taskCount * replicas)',
isDefined: (t: TuningConfig) => t.type === 'kafka' || t.type === 'kinesis',
info: <>
The number of threads that will be used for communicating with indexing tasks.
</>
},
{
name: 'chatRetries',
type: 'number',
defaultValue: 8,
isDefined: (t: TuningConfig) => t.type === 'kafka' || t.type === 'kinesis',
info: <>
The number of times HTTP requests to indexing tasks will be retried before considering tasks unresponsive.
</>
},
{
name: 'httpTimeout',
type: 'duration',
defaultValue: 'PT10S',
isDefined: (t: TuningConfig) => t.type === 'kafka' || t.type === 'kinesis',
info: <>
How long to wait for a HTTP response from an indexing task.
</>
},
{
name: 'shutdownTimeout',
type: 'duration',
defaultValue: 'PT80S',
isDefined: (t: TuningConfig) => t.type === 'kafka' || t.type === 'kinesis',
info: <>
How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.
</>
},
{
name: 'offsetFetchPeriod',
type: 'duration',
defaultValue: 'PT30S',
isDefined: (t: TuningConfig) => t.type === 'kafka',
info: <>
How often the supervisor queries Kafka and the indexing tasks to fetch current offsets and calculate lag.
</>
},
{
name: 'recordBufferSize',
type: 'number',
defaultValue: 10000,
isDefined: (t: TuningConfig) => t.type === 'kinesis',
info: <>
Size of the buffer (number of events) used between the Kinesis fetch threads and the main ingestion thread.
</>
},
{
name: 'recordBufferOfferTimeout',
type: 'number',
defaultValue: 5000,
isDefined: (t: TuningConfig) => t.type === 'kinesis',
info: <>
Length of time in milliseconds to wait for space to become available in the buffer before timing out.
</>
},
{
name: 'recordBufferFullWait',
type: 'number',
defaultValue: 5000,
isDefined: (t: TuningConfig) => t.type === 'kinesis',
info: <>
Length of time in milliseconds to wait for the buffer to drain before attempting to fetch records from Kinesis again.
</>
},
{
name: 'fetchSequenceNumberTimeout',
type: 'number',
defaultValue: 60000,
isDefined: (t: TuningConfig) => t.type === 'kinesis',
info: <>
Length of time in milliseconds to wait for Kinesis to return the earliest or latest sequence number for a shard. Kinesis will not return the latest sequence number if no data is actively being written to that shard.
In this case, this fetch call will repeatedly timeout and retry until fresh data is written to the stream.
</>
},
{
name: 'fetchThreads',
type: 'number',
placeholder: 'max(1, {numProcessors} - 1)',
isDefined: (t: TuningConfig) => t.type === 'kinesis',
info: <>
Size of the pool of threads fetching data from Kinesis.
There is no benefit in having more threads than Kinesis shards.
</>
},
{
name: 'maxRecordsPerPoll',
type: 'number',
defaultValue: 100,
isDefined: (t: TuningConfig) => t.type === 'kinesis',
info: <>
The maximum number of records/events to be fetched from buffer per poll.
The actual maximum will be <Code>max(maxRecordsPerPoll, max(bufferSize, 1))</Code>.
</>
}
];
@ -1012,12 +1470,14 @@ export interface Bitmap {
// --------------
export function getBlankSpec(ingestionType: IngestionType = 'index', firehoseType: string | null = null): IngestionSpec {
const ioAndTuningConfigType = ingestionTypeToIoAndTuningConfigType(ingestionType);
export function getBlankSpec(comboType: IngestionComboType): IngestionSpec {
let [ingestionType, firehoseType] = comboType.split(':');
if (ingestionType === 'index') ingestionType = 'index_parallel';
const ioAndTuningConfigType = ingestionTypeToIoAndTuningConfigType(ingestionType as IngestionType);
const granularitySpec: GranularitySpec = {
type: 'uniform',
segmentGranularity: ['index', 'index_parallel'].includes(ingestionType) ? 'DAY' : 'HOUR',
segmentGranularity: ingestionType === 'index_parallel' ? 'DAY' : 'HOUR',
queryGranularity: 'HOUR'
};

View File

@ -22,7 +22,7 @@ import { getDruidErrorMessage } from './druid-query';
import { filterMap, sortWithPrefixSuffix } from './general';
import {
DimensionsSpec,
getEmptyTimestampSpec,
getEmptyTimestampSpec, getSpecType,
IngestionSpec,
IoConfig, MetricSpec,
Parser,
@ -35,17 +35,19 @@ import { QueryState } from './query-state';
const SAMPLER_URL = `/druid/indexer/v1/sampler`;
const BASE_SAMPLER_CONFIG: SamplerConfig = {
// skipCache: true,
numRows: 500
numRows: 500,
timeoutMs: 15000
};
export interface SampleSpec {
type: 'index';
type: string;
spec: IngestionSpec;
samplerConfig: SamplerConfig;
}
export interface SamplerConfig {
numRows?: number;
timeoutMs?: number;
cacheKey?: string;
skipCache?: boolean;
}
@ -79,6 +81,14 @@ function dedupe(xs: string[]): string[] {
});
}
type SamplerType = 'index' | 'kafka' | 'kinesis';
export function getSamplerType(spec: IngestionSpec): SamplerType {
const specType = getSpecType(spec);
if (specType === 'kafka' || specType === 'kinesis') return specType;
return 'index';
}
export function headerFromSampleResponse(sampleResponse: SampleResponse, ignoreColumn?: string): string[] {
let columns = sortWithPrefixSuffix(dedupe(
[].concat(...(filterMap(sampleResponse.data, s => s.parsed ? Object.keys(s.parsed) : null) as any))
@ -98,6 +108,17 @@ export function headerAndRowsFromSampleResponse(sampleResponse: SampleResponse,
};
}
export async function getOverlordModules(): Promise<string[]> {
let statusResp: any;
try {
statusResp = await axios.get(`/proxy/overlord/status`);
} catch (e) {
throw new Error(getDruidErrorMessage(e));
}
return statusResp.data.modules.map((m: any) => m.artifact);
}
async function postToSampler(sampleSpec: SampleSpec, forStr: string): Promise<SampleResponse> {
let sampleResp: any;
try {
@ -109,24 +130,40 @@ async function postToSampler(sampleSpec: SampleSpec, forStr: string): Promise<Sa
return sampleResp.data;
}
export async function sampleForConnect(spec: IngestionSpec): Promise<SampleResponse> {
const ioConfig: IoConfig = deepGet(spec, 'ioConfig') || {};
export type SampleStrategy = 'start' | 'end';
function makeSamplerIoConfig(ioConfig: IoConfig, samplerType: SamplerType, sampleStrategy: SampleStrategy): IoConfig {
ioConfig = deepSet(ioConfig || {}, 'type', samplerType);
if (samplerType === 'kafka') {
ioConfig = deepSet(ioConfig, 'useEarliestOffset', sampleStrategy === 'start');
} else if (samplerType === 'kinesis') {
ioConfig = deepSet(ioConfig, 'useEarliestSequenceNumber', sampleStrategy === 'start');
}
return ioConfig;
}
export async function sampleForConnect(spec: IngestionSpec, sampleStrategy: SampleStrategy): Promise<SampleResponse> {
const samplerType = getSamplerType(spec);
const ioConfig: IoConfig = makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy);
const sampleSpec: SampleSpec = {
type: 'index',
type: samplerType,
spec: {
ioConfig: deepSet(ioConfig, 'type', 'index')
// dataSchema: {
// dataSource: 'sample',
// parser: {
// type: 'string',
// parseSpec: {
// format: 'json',
// dimensionsSpec: {},
// timestampSpec: getEmptyTimestampSpec()
// }
// }
// }
type: samplerType,
ioConfig,
dataSchema: {
dataSource: 'sample',
parser: {
type: 'string',
parseSpec: {
format: 'regex',
pattern: '(.*)',
columns: ['a'],
dimensionsSpec: {},
timestampSpec: getEmptyTimestampSpec()
}
}
}
} as any,
samplerConfig: BASE_SAMPLER_CONFIG
};
@ -134,14 +171,16 @@ export async function sampleForConnect(spec: IngestionSpec): Promise<SampleRespo
return postToSampler(sampleSpec, 'connect');
}
export async function sampleForParser(spec: IngestionSpec, cacheKey: string | undefined): Promise<SampleResponse> {
const ioConfig: IoConfig = deepGet(spec, 'ioConfig') || {};
export async function sampleForParser(spec: IngestionSpec, sampleStrategy: SampleStrategy, cacheKey: string | undefined): Promise<SampleResponse> {
const samplerType = getSamplerType(spec);
const ioConfig: IoConfig = makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy);
const parser: Parser = deepGet(spec, 'dataSchema.parser') || {};
const sampleSpec: SampleSpec = {
type: 'index',
type: samplerType,
spec: {
ioConfig: deepSet(ioConfig, 'type', 'index'),
type: samplerType,
ioConfig: deepSet(ioConfig, 'type', samplerType),
dataSchema: {
dataSource: 'sample',
parser: {
@ -165,15 +204,17 @@ export async function sampleForParser(spec: IngestionSpec, cacheKey: string | un
return postToSampler(sampleSpec, 'parser');
}
export async function sampleForTimestamp(spec: IngestionSpec, cacheKey: string | undefined): Promise<SampleResponse> {
const ioConfig: IoConfig = deepGet(spec, 'ioConfig') || {};
export async function sampleForTimestamp(spec: IngestionSpec, sampleStrategy: SampleStrategy, cacheKey: string | undefined): Promise<SampleResponse> {
const samplerType = getSamplerType(spec);
const ioConfig: IoConfig = makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy);
const parser: Parser = deepGet(spec, 'dataSchema.parser') || {};
const parseSpec: ParseSpec = deepGet(spec, 'dataSchema.parser.parseSpec') || {};
const sampleSpec: SampleSpec = {
type: 'index',
type: samplerType,
spec: {
ioConfig: deepSet(ioConfig, 'type', 'index'),
type: samplerType,
ioConfig: deepSet(ioConfig, 'type', samplerType),
dataSchema: {
dataSource: 'sample',
parser: {
@ -192,8 +233,9 @@ export async function sampleForTimestamp(spec: IngestionSpec, cacheKey: string |
return postToSampler(sampleSpec, 'timestamp');
}
export async function sampleForTransform(spec: IngestionSpec, cacheKey: string | undefined): Promise<SampleResponse> {
const ioConfig: IoConfig = deepGet(spec, 'ioConfig') || {};
export async function sampleForTransform(spec: IngestionSpec, sampleStrategy: SampleStrategy, cacheKey: string | undefined): Promise<SampleResponse> {
const samplerType = getSamplerType(spec);
const ioConfig: IoConfig = makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy);
const parser: Parser = deepGet(spec, 'dataSchema.parser') || {};
const parseSpec: ParseSpec = deepGet(spec, 'dataSchema.parser.parseSpec') || {};
const transforms: Transform[] = deepGet(spec, 'dataSchema.transformSpec.transforms') || [];
@ -203,9 +245,10 @@ export async function sampleForTransform(spec: IngestionSpec, cacheKey: string |
if (transforms && transforms.length) {
const sampleSpecHack: SampleSpec = {
type: 'index',
type: samplerType,
spec: {
ioConfig: deepSet(ioConfig, 'type', 'index'),
type: samplerType,
ioConfig: deepSet(ioConfig, 'type', samplerType),
dataSchema: {
dataSource: 'sample',
parser: {
@ -227,9 +270,10 @@ export async function sampleForTransform(spec: IngestionSpec, cacheKey: string |
}
const sampleSpec: SampleSpec = {
type: 'index',
type: samplerType,
spec: {
ioConfig: deepSet(ioConfig, 'type', 'index'),
type: samplerType,
ioConfig: deepSet(ioConfig, 'type', samplerType),
dataSchema: {
dataSource: 'sample',
parser: {
@ -251,8 +295,9 @@ export async function sampleForTransform(spec: IngestionSpec, cacheKey: string |
return postToSampler(sampleSpec, 'transform');
}
export async function sampleForFilter(spec: IngestionSpec, cacheKey: string | undefined): Promise<SampleResponse> {
const ioConfig: IoConfig = deepGet(spec, 'ioConfig') || {};
export async function sampleForFilter(spec: IngestionSpec, sampleStrategy: SampleStrategy, cacheKey: string | undefined): Promise<SampleResponse> {
const samplerType = getSamplerType(spec);
const ioConfig: IoConfig = makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy);
const parser: Parser = deepGet(spec, 'dataSchema.parser') || {};
const parseSpec: ParseSpec = deepGet(spec, 'dataSchema.parser.parseSpec') || {};
const transforms: Transform[] = deepGet(spec, 'dataSchema.transformSpec.transforms') || [];
@ -263,9 +308,10 @@ export async function sampleForFilter(spec: IngestionSpec, cacheKey: string | un
if (transforms && transforms.length) {
const sampleSpecHack: SampleSpec = {
type: 'index',
type: samplerType,
spec: {
ioConfig: deepSet(ioConfig, 'type', 'index'),
type: samplerType,
ioConfig: deepSet(ioConfig, 'type', samplerType),
dataSchema: {
dataSource: 'sample',
parser: {
@ -287,9 +333,10 @@ export async function sampleForFilter(spec: IngestionSpec, cacheKey: string | un
}
const sampleSpec: SampleSpec = {
type: 'index',
type: samplerType,
spec: {
ioConfig: deepSet(ioConfig, 'type', 'index'),
type: samplerType,
ioConfig: deepSet(ioConfig, 'type', samplerType),
dataSchema: {
dataSource: 'sample',
parser: {
@ -312,17 +359,19 @@ export async function sampleForFilter(spec: IngestionSpec, cacheKey: string | un
return postToSampler(sampleSpec, 'filter');
}
export async function sampleForSchema(spec: IngestionSpec, cacheKey: string | undefined): Promise<SampleResponse> {
const ioConfig: IoConfig = deepGet(spec, 'ioConfig') || {};
export async function sampleForSchema(spec: IngestionSpec, sampleStrategy: SampleStrategy, cacheKey: string | undefined): Promise<SampleResponse> {
const samplerType = getSamplerType(spec);
const ioConfig: IoConfig = makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy);
const parser: Parser = deepGet(spec, 'dataSchema.parser') || {};
const transformSpec: TransformSpec = deepGet(spec, 'dataSchema.transformSpec') || ({} as TransformSpec);
const metricsSpec: MetricSpec[] = deepGet(spec, 'dataSchema.metricsSpec') || [];
const queryGranularity: string = deepGet(spec, 'dataSchema.granularitySpec.queryGranularity') || 'NONE';
const sampleSpec: SampleSpec = {
type: 'index',
type: samplerType,
spec: {
ioConfig: deepSet(ioConfig, 'type', 'index'),
type: samplerType,
ioConfig: deepSet(ioConfig, 'type', samplerType),
dataSchema: {
dataSource: 'sample',
parser: whitelistKeys(parser, ['type', 'parseSpec']) as Parser,

View File

@ -402,6 +402,8 @@ GROUP BY 1`);
}
getDatasourceActions(datasource: string, disabled: boolean): BasicAction[] {
const { goToSql } = this.props;
if (disabled) {
return [
{
@ -418,6 +420,11 @@ GROUP BY 1`);
];
} else {
return [
{
icon: IconNames.APPLICATION,
title: 'Query with SQL',
onAction: () => goToSql(`SELECT * FROM "${datasource}"`)
},
{
icon: IconNames.EXPORT,
title: 'Reload data by interval',

View File

@ -38,14 +38,10 @@
font-size: 20px;
}
.section-title {
margin-bottom: 10px;
font-weight: bold;
}
.cards {
.bp3-card {
display: inline-block;
vertical-align: top;
width: 250px;
height: 140px;
margin-right: 15px;
@ -53,6 +49,10 @@
font-size: 24px;
text-align: center;
padding-top: 47px;
&.disabled {
opacity: 0.4;
}
}
}
}

View File

@ -22,7 +22,7 @@ import {
Button,
ButtonGroup, Callout, Card,
Classes, Code,
FormGroup, H5,
FormGroup, H5, HTMLSelect,
Icon, Intent, Popover, Switch, TextArea
} from '@blueprintjs/core';
import { IconNames } from '@blueprintjs/icons';
@ -54,24 +54,58 @@ import { escapeColumnName } from '../utils/druid-expression';
import { possibleDruidFormatForValues } from '../utils/druid-time';
import { updateSchemaWithSample } from '../utils/druid-type';
import {
changeParallel, DimensionMode,
DimensionSpec, DimensionsSpec, DruidFilter,
changeParallel,
DimensionMode,
DimensionSpec,
DimensionsSpec,
DruidFilter,
fillDataSourceName,
fillParser,
FlattenField, getBlankSpec, getDimensionMode,
FlattenField,
getBlankSpec,
getDimensionMode,
getDimensionSpecFormFields,
getDimensionSpecName, getDimensionSpecType, getEmptyTimestampSpec, getFilterFormFields, getFlattenFieldFormFields,
getIngestionComboType, getIoConfigFormFields, getIoConfigTuningFormFields, getMetricSpecFormFields,
getMetricSpecName, getParseSpecFormFields, getRollup, getTimestampSpecColumn, getTimestampSpecFormFields,
getDimensionSpecName,
getDimensionSpecType,
getEmptyTimestampSpec,
getFilterFormFields,
getFlattenFieldFormFields,
getIngestionComboType,
getIoConfigFormFields,
getIoConfigTuningFormFields,
getMetricSpecFormFields,
getMetricSpecName,
getParseSpecFormFields,
getPartitionRelatedTuningSpecFormFields,
getRollup,
getSpecType,
getTimestampSpecColumn,
getTimestampSpecFormFields,
getTransformFormFields,
getTuningSpecFormFields, GranularitySpec, hasParallelAbility, inflateDimensionSpec, IngestionSpec,
IngestionType, IoConfig,
isColumnTimestampSpec, isParallel, issueWithIoConfig, issueWithParser, joinFilter,
MetricSpec, Parser, ParseSpec,
parseSpecHasFlatten, splitFilter, TimestampSpec, Transform, TuningConfig
getTuningSpecFormFields,
GranularitySpec,
hasParallelAbility,
inflateDimensionSpec, IngestionComboType,
IngestionSpec,
IngestionType,
IoConfig,
isColumnTimestampSpec,
isParallel,
issueWithIoConfig,
issueWithParser,
joinFilter,
MetricSpec,
Parser,
ParseSpec,
parseSpecHasFlatten,
splitFilter,
TimestampSpec,
Transform,
TuningConfig
} from '../utils/ingestion-spec';
import { deepDelete, deepGet, deepSet } from '../utils/object-change';
import {
getOverlordModules,
HeaderAndRows,
headerAndRowsFromSampleResponse,
SampleEntry,
@ -80,16 +114,20 @@ import {
sampleForParser, sampleForSchema,
sampleForTimestamp,
sampleForTransform,
SampleResponse
SampleResponse, SampleStrategy
} from '../utils/sampler';
import { computeFlattenPathsForData } from '../utils/spec-utils';
import './load-data-view.scss';
export interface LoadDataViewSeed {
type?: IngestionType;
firehoseType?: string;
initSpec?: IngestionSpec;
function showRawLine(line: string): string {
if (line.includes('\n')) {
return `<Multi-line row, length: ${line.length}>`;
}
if (line.length > 1000) {
return line.substr(0, 1000) + '...';
}
return line;
}
function filterMatch(testString: string, searchString: string): boolean {
@ -136,8 +174,8 @@ const VIEW_TITLE: Record<Stage, string> = {
};
export interface LoadDataViewProps extends React.Props<any> {
seed: LoadDataViewSeed | null;
goToTask: (taskId: string | null) => void;
initSpec: IngestionSpec | null;
goToTask: (taskId: string | null, openDialog?: string) => void;
}
export interface LoadDataViewState {
@ -151,6 +189,9 @@ export interface LoadDataViewState {
newDimensionMode: DimensionMode | null;
// general
overlordModules: string[] | null;
overlordModuleNeededMessage: string | null;
sampleStrategy: SampleStrategy;
columnFilter: string;
specialColumnsOnly: boolean;
@ -191,7 +232,7 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
constructor(props: LoadDataViewProps) {
super(props);
let spec = parseJson(String(localStorageGet(LocalStorageKeys.INGESTION_SPEC)));
let spec = props.initSpec || parseJson(String(localStorageGet(LocalStorageKeys.INGESTION_SPEC)));
if (!spec || typeof spec !== 'object') spec = {};
this.state = {
@ -205,6 +246,9 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
newDimensionMode: null,
// general
overlordModules: null,
overlordModuleNeededMessage: null,
sampleStrategy: 'start',
columnFilter: '',
specialColumnsOnly: false,
@ -243,9 +287,26 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
}
componentDidMount(): void {
this.getOverlordModules();
this.updateStage('connect');
}
async getOverlordModules() {
let overlordModules: string[];
try {
overlordModules = await getOverlordModules();
} catch (e) {
AppToaster.show({
message: `Failed to get overlord modules: ${e.message}`,
intent: Intent.DANGER
});
this.setState({ overlordModules: [] });
return;
}
this.setState({ overlordModules });
}
private updateStage = (newStage: Stage) => {
this.doQueryForStage(newStage);
this.setState({ stage: newStage });
@ -360,48 +421,70 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
// ==================================================================
initWith(seed: LoadDataViewSeed) {
initWith(comboType: IngestionComboType) {
this.setState({
spec: getBlankSpec(seed.type, seed.firehoseType)
spec: getBlankSpec(comboType)
});
setTimeout(() => {
this.updateStage('connect');
}, 10);
}
renderIngestionCard(title: string, comboType: IngestionComboType, requiredModule?: string) {
const { overlordModules } = this.state;
if (!overlordModules) return null;
const goodToGo = !requiredModule || overlordModules.includes(requiredModule);
return <Card
className={classNames({ disabled: !goodToGo })}
interactive
onClick={() => {
if (goodToGo) {
this.initWith(comboType);
} else {
this.setState({
overlordModuleNeededMessage: `${title} ingestion requires the '${requiredModule}' to be loaded.`
});
}
}}
>
{title}
</Card>;
}
renderInitStage() {
const showStreaming = false;
const { goToTask } = this.props;
const { overlordModuleNeededMessage } = this.state;
return <>
<div className="intro">
Please specify where your raw data is located
</div>
<Callout intent={Intent.SUCCESS} icon={IconNames.INFO_SIGN}>
Welcome to the Druid data loader.
This project is under active development and we plan to support many other sources of raw data, including stream hubs such as Apache Kafka and AWS Kinesis, in the next few releases.
</Callout>
{
showStreaming &&
<div className="section">
<div className="section-title">Stream hub</div>
<div className="cards">
<Card interactive onClick={() => this.initWith({ type: 'kafka' })}>Apache Kafka</Card>
<Card interactive onClick={() => this.initWith({ type: 'kinesis' })}>AWS Kinesis</Card>
{this.renderIngestionCard('Apache Kafka', 'kafka', 'druid-kafka-indexing-service')}
{this.renderIngestionCard('AWS Kinesis', 'kinesis', 'druid-kinesis-indexing-service')}
{this.renderIngestionCard('HTTP(s)', 'index:http')}
{this.renderIngestionCard('AWS S3', 'index:static-s3', 'druid-s3-extensions')}
{this.renderIngestionCard('Google Cloud Storage', 'index:static-google-blobstore', 'druid-google-extensions')}
{this.renderIngestionCard('Local disk', 'index:local')}
<Card interactive onClick={() => goToTask(null, 'supervisor')}>
Other (streaming)
</Card>
<Card interactive onClick={() => goToTask(null, 'task')}>
Other (batch)
</Card>
</div>
</div>
}
<div className="section">
<div className="section-title">Batch load</div>
<div className="cards">
<Card interactive onClick={() => this.initWith({ type: 'index_parallel', firehoseType: 'http' })}>HTTP(s)</Card>
<Card interactive onClick={() => this.initWith({ type: 'index_parallel', firehoseType: 'static-s3' })}>AWS S3</Card>
<Card interactive onClick={() => this.initWith({ type: 'index_parallel', firehoseType: 'static-google-blobstore' })}>Google Blobstore</Card>
<Card interactive onClick={() => this.initWith({ type: 'index_parallel', firehoseType: 'local' })}>Local disk</Card>
</div>
</div>
<Alert
icon={IconNames.WARNING_SIGN}
intent={Intent.WARNING}
isOpen={Boolean(overlordModuleNeededMessage)}
confirmButtonText="Close"
onConfirm={() => this.setState({ overlordModuleNeededMessage: null })}
>
<p>{overlordModuleNeededMessage}</p>
</Alert>
</>;
}
@ -430,7 +513,7 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
// ==================================================================
async queryForConnect(initRun = false) {
const { spec } = this.state;
const { spec, sampleStrategy } = this.state;
const ioConfig: IoConfig = deepGet(spec, 'ioConfig') || {};
let issue: string | undefined;
@ -451,7 +534,7 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
let sampleResponse: SampleResponse;
try {
sampleResponse = await sampleForConnect(spec);
sampleResponse = await sampleForConnect(spec, sampleStrategy);
} catch (e) {
this.setState({
inputQueryState: new QueryState({ error: e.message })
@ -466,7 +549,8 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
}
renderConnectStage() {
const { spec, inputQueryState } = this.state;
const { spec, inputQueryState, sampleStrategy } = this.state;
const specType = getSpecType(spec);
const ioConfig: IoConfig = deepGet(spec, 'ioConfig') || {};
const isBlank = !ioConfig.type;
@ -488,7 +572,11 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
const inputData = inputQueryState.data;
mainFill = <TextArea
className="raw-lines"
value={(inputData.every(l => !l) ? inputData.map(_ => '[Binary data]') : inputData).join('\n')}
value={
inputData.length ?
(inputData.every(l => !l) ? inputData.map(_ => '<Binary data>') : inputData.map(showRawLine)).join('\n') :
'No data returned from sampler'
}
readOnly
/>;
}
@ -531,6 +619,15 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
</Callout>
</FormGroup>
}
{
(specType === 'kafka' || specType === 'kinesis') &&
<FormGroup label="Where should the data be sampled from?">
<HTMLSelect value={sampleStrategy} onChange={e => this.setState({ sampleStrategy: e.target.value as any })}>
<option value="start">Start of stream</option>
<option value="end">End of the stream</option>
</HTMLSelect>
</FormGroup>
}
<Button
text="Preview"
disabled={isBlank}
@ -552,7 +649,7 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
// ==================================================================
async queryForParser(initRun = false) {
const { spec, cacheKey } = this.state;
const { spec, sampleStrategy, cacheKey } = this.state;
const ioConfig: IoConfig = deepGet(spec, 'ioConfig') || {};
const parser: Parser = deepGet(spec, 'dataSchema.parser') || {};
@ -576,7 +673,7 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
let sampleResponse: SampleResponse;
try {
sampleResponse = await sampleForParser(spec, cacheKey);
sampleResponse = await sampleForParser(spec, sampleStrategy, cacheKey);
} catch (e) {
this.setState({
parserQueryState: new QueryState({ error: e.message })
@ -834,7 +931,7 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
// ==================================================================
async queryForTimestamp(initRun = false) {
const { spec, cacheKey } = this.state;
const { spec, sampleStrategy, cacheKey } = this.state;
const ioConfig: IoConfig = deepGet(spec, 'ioConfig') || {};
const parser: Parser = deepGet(spec, 'dataSchema.parser') || {};
@ -858,7 +955,7 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
let sampleResponse: SampleResponse;
try {
sampleResponse = await sampleForTimestamp(spec, cacheKey);
sampleResponse = await sampleForTimestamp(spec, sampleStrategy, cacheKey);
} catch (e) {
this.setState({
timestampQueryState: new QueryState({ error: e.message })
@ -1036,7 +1133,7 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
// ==================================================================
async queryForTransform(initRun = false) {
const { spec, cacheKey } = this.state;
const { spec, sampleStrategy, cacheKey } = this.state;
const ioConfig: IoConfig = deepGet(spec, 'ioConfig') || {};
const parser: Parser = deepGet(spec, 'dataSchema.parser') || {};
@ -1060,7 +1157,7 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
let sampleResponse: SampleResponse;
try {
sampleResponse = await sampleForTransform(spec, cacheKey);
sampleResponse = await sampleForTransform(spec, sampleStrategy, cacheKey);
} catch (e) {
this.setState({
transformQueryState: new QueryState({ error: e.message })
@ -1179,6 +1276,22 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
Click "Preview" to see the result of any specified transforms.
</p>
</Callout>
{
Boolean(transformQueryState.error && transforms.length) &&
<FormGroup>
<Button
icon={IconNames.EDIT}
text="Edit last added transform"
intent={Intent.PRIMARY}
onClick={() => {
this.setState({
selectedTransformIndex: transforms.length - 1,
selectedTransform: transforms[transforms.length - 1]
});
}}
/>
</FormGroup>
}
{this.renderTransformControls()}
<Button
text="Preview"
@ -1262,7 +1375,7 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
// ==================================================================
async queryForFilter(initRun = false) {
const { spec, cacheKey } = this.state;
const { spec, sampleStrategy, cacheKey } = this.state;
const ioConfig: IoConfig = deepGet(spec, 'ioConfig') || {};
const parser: Parser = deepGet(spec, 'dataSchema.parser') || {};
@ -1286,7 +1399,7 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
let sampleResponse: SampleResponse;
try {
sampleResponse = await sampleForFilter(spec, cacheKey);
sampleResponse = await sampleForFilter(spec, sampleStrategy, cacheKey);
} catch (e) {
this.setState({
filterQueryState: new QueryState({ error: e.message })
@ -1545,7 +1658,7 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
// ==================================================================
async queryForSchema(initRun = false) {
const { spec, cacheKey } = this.state;
const { spec, sampleStrategy, cacheKey } = this.state;
const ioConfig: IoConfig = deepGet(spec, 'ioConfig') || {};
const parser: Parser = deepGet(spec, 'dataSchema.parser') || {};
@ -1569,7 +1682,7 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
let sampleResponse: SampleResponse;
try {
sampleResponse = await sampleForSchema(spec, cacheKey);
sampleResponse = await sampleForSchema(spec, sampleStrategy, cacheKey);
} catch (e) {
this.setState({
schemaQueryState: new QueryState({ error: e.message })
@ -1729,7 +1842,7 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
<Switch
checked={dimensionMode === 'specific'}
onChange={() => this.setState({ newDimensionMode: dimensionMode === 'specific' ? 'auto-detect' : 'specific' })}
label="Set dimensions and metrics"
label="Explicitly specify dimension list"
/>
<Popover
content={
@ -1816,12 +1929,12 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
}
renderChangeRollupAction() {
const { newRollup, spec, cacheKey } = this.state;
const { newRollup, spec, sampleStrategy, cacheKey } = this.state;
if (newRollup === null) return;
return <AsyncActionDialog
action={async () => {
const sampleResponse = await sampleForTransform(spec, cacheKey);
const sampleResponse = await sampleForTransform(spec, sampleStrategy, cacheKey);
this.updateSpec(updateSchemaWithSample(spec, headerAndRowsFromSampleResponse(sampleResponse), getDimensionMode(spec), newRollup));
setTimeout(() => {
this.queryForSchema();
@ -1843,13 +1956,13 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
}
renderChangeDimensionModeAction() {
const { newDimensionMode, spec, cacheKey } = this.state;
const { newDimensionMode, spec, sampleStrategy, cacheKey } = this.state;
if (newDimensionMode === null) return;
const autoDetect = newDimensionMode === 'auto-detect';
return <AsyncActionDialog
action={async () => {
const sampleResponse = await sampleForTransform(spec, cacheKey);
const sampleResponse = await sampleForTransform(spec, sampleStrategy, cacheKey);
this.updateSpec(updateSchemaWithSample(spec, headerAndRowsFromSampleResponse(sampleResponse), newDimensionMode, getRollup(spec)));
setTimeout(() => {
this.queryForSchema();
@ -1864,8 +1977,8 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
<p>
{
autoDetect ?
'Are you sure you dont want to set the dimensions and metrics explicitly?' :
'Are you sure you want to set dimensions and metrics explicitly?'
`Are you sure you don't want to explicitly specify a dimension list?` :
`Are you sure you want to explicitly specify a dimension list?`
}
</p>
<p>
@ -2019,7 +2132,6 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
const { spec } = this.state;
const tuningConfig: TuningConfig = deepGet(spec, 'tuningConfig') || {};
const granularitySpec: GranularitySpec = deepGet(spec, 'dataSchema.granularitySpec') || {};
const myIsParallel = isParallel(spec);
return <>
<div className="main">
@ -2053,68 +2165,7 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
<div className="other">
<H5>Secondary partitioning</H5>
<AutoForm
fields={[
{
name: 'partitionDimensions',
type: 'string-array',
disabled: myIsParallel,
info: <>
<p>
Does not currently work with parallel ingestion
</p>
<p>
The dimensions to partition on.
Leave blank to select all dimensions. Only used with forceGuaranteedRollup = true, will be ignored otherwise.
</p>
</>
},
{
name: 'forceGuaranteedRollup',
type: 'boolean',
disabled: myIsParallel,
info: <>
<p>
Does not currently work with parallel ingestion
</p>
<p>
Forces guaranteeing the perfect rollup.
The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased.
If this is set to true, the index task will read the entire input data twice: one for finding the optimal number of partitions per time chunk and one for generating segments.
</p>
</>
},
{
name: 'targetPartitionSize',
type: 'number',
info: <>
Target number of rows to include in a partition, should be a number that targets segments of 500MB~1GB.
</>
},
{
name: 'numShards',
type: 'number',
info: <>
Directly specify the number of shards to create.
If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.
</>
},
{
name: 'maxRowsPerSegment',
type: 'number',
defaultValue: 5000000,
info: <>
Determines how many rows are in each segment.
</>
},
{
name: 'maxTotalRows',
type: 'number',
defaultValue: 20000000,
info: <>
Total number of rows in segments waiting for being pushed.
</>
}
]}
fields={getPartitionRelatedTuningSpecFormFields(getSpecType(spec) || 'index')}
model={tuningConfig}
onChange={t => this.updateSpec(deepSet(spec, 'tuningConfig', t))}
/>
@ -2250,7 +2301,48 @@ export class LoadDataView extends React.Component<LoadDataViewProps, LoadDataVie
onChange={s => this.updateSpec(s)}
/>
</div>
<div className="other"/>
<div className="other">
<H5>Parse error reporting</H5>
<AutoForm
fields={[
{
name: 'tuningConfig.logParseExceptions',
label: 'Log parse exceptions',
type: 'boolean',
defaultValue: false,
info: <>
If true, log an error message when a parsing exception occurs, containing information about the row where the error occurred.
</>
},
{
name: 'tuningConfig.maxParseExceptions',
label: 'Max parse exceptions',
type: 'number',
placeholder: '(unlimited)',
info: <>
The maximum number of parse exceptions that can occur before the task halts ingestion and fails.
</>
},
{
name: 'tuningConfig.maxSavedParseExceptions',
label: 'Max saved parse exceptions',
type: 'number',
defaultValue: 0,
info: <>
<p>
When a parse exception occurs, Druid can keep track of the most recent parse exceptions.
</p>
<p>
This property limits how many exception instances will be saved.
These saved exceptions will be made available after the task finishes in the task view.
</p>
</>
}
]}
model={spec}
onChange={s => this.updateSpec(s)}
/>
</div>
<div className="control">
<Callout className="intro">
<p>

View File

@ -105,7 +105,7 @@ export class SqlView extends React.Component<SqlViewProps, SqlViewState> {
} else {
const actualQuery = wrapQuery ?
`SELECT * FROM (${queryString.trim().replace(/;+$/, '')}) LIMIT 5000` :
`SELECT * FROM (${queryString.replace(/;+(\s*)$/, '$1')}) LIMIT 2000` :
queryString;
const queryPayload: Record<string, any> = {

View File

@ -49,6 +49,7 @@ const taskTableColumns: string[] = ['Task ID', 'Type', 'Datasource', 'Created ti
export interface TasksViewProps extends React.Props<any> {
taskId: string | null;
openDialog: string | null;
goToSql: (initSql: string) => void;
goToMiddleManager: (middleManager: string) => void;
goToLoadDataView: () => void;
@ -139,8 +140,8 @@ export class TasksView extends React.Component<TasksViewProps, TasksViewState> {
killTaskId: null,
supervisorSpecDialogOpen: false,
taskSpecDialogOpen: false,
supervisorSpecDialogOpen: props.openDialog === 'supervisor',
taskSpecDialogOpen: props.openDialog === 'task',
initSpec: null,
alertErrorMsg: null,

View File

@ -55,7 +55,8 @@ module.exports = (env) => {
port: 18081,
proxy: {
'/status': proxyTarget,
'/druid': proxyTarget
'/druid': proxyTarget,
'/proxy': proxyTarget
}
},
module: {