From be16e4a4ae59aa0c49c2813ee53d4e04257a7aeb Mon Sep 17 00:00:00 2001 From: Vadim Ogievetsky Date: Fri, 17 May 2019 14:01:27 -0700 Subject: [PATCH] Web console, adding Apache Kafka and AWS Kinesis to the data loader (#7643) * adding kafka and kinesis to the data loader * feature detect * copy fixes * wording fixes * added missing spec type * increase timeout * Call it Google Cloud Storage --- web-console/src/components/array-input.tsx | 5 +- web-console/src/components/auto-form.tsx | 7 +- web-console/src/components/header-bar.tsx | 7 +- web-console/src/console-application.tsx | 19 +- web-console/src/utils/druid-query.ts | 1 + web-console/src/utils/general.tsx | 4 + web-console/src/utils/ingestion-spec.tsx | 522 +++++++++++++++++++-- web-console/src/utils/sampler.ts | 133 ++++-- web-console/src/views/datasource-view.tsx | 7 + web-console/src/views/load-data-view.scss | 10 +- web-console/src/views/load-data-view.tsx | 356 ++++++++------ web-console/src/views/sql-view.tsx | 2 +- web-console/src/views/tasks-view.tsx | 5 +- web-console/webpack.config.js | 3 +- 14 files changed, 850 insertions(+), 231 deletions(-) diff --git a/web-console/src/components/array-input.tsx b/web-console/src/components/array-input.tsx index 4c6d84c8840..e771d443a15 100644 --- a/web-console/src/components/array-input.tsx +++ b/web-console/src/components/array-input.tsx @@ -17,7 +17,7 @@ */ -import { InputGroup, ITagInputProps } from '@blueprintjs/core'; +import { ITagInputProps, TextArea } from '@blueprintjs/core'; import * as React from 'react'; export interface ArrayInputProps extends ITagInputProps { @@ -46,13 +46,14 @@ export class ArrayInput extends React.Component; } } diff --git a/web-console/src/components/auto-form.tsx b/web-console/src/components/auto-form.tsx index 4add8ea1308..97876990721 100644 --- a/web-console/src/components/auto-form.tsx +++ b/web-console/src/components/auto-form.tsx @@ -47,7 +47,7 @@ export interface Field { name: string; label?: string; info?: React.ReactNode; - type: 'number' | 'size-bytes' | 'string' | 'boolean' | 'string-array' | 'json'; + type: 'number' | 'size-bytes' | 'string' | 'duration' | 'boolean' | 'string-array' | 'json'; defaultValue?: any; isDefined?: (model: T) => boolean; disabled?: boolean; @@ -140,7 +140,7 @@ export class AutoForm> extends React.Component; } - private renderStringInput(field: Field): JSX.Element { + private renderStringInput(field: Field, sanitize?: (str: string) => string): JSX.Element { const { model, large } = this.props; const suggestionsMenu = field.suggestions ? @@ -178,7 +178,7 @@ export class AutoForm> extends React.Component { const v = e.target.value; - this.fieldChange(field, v === '' ? undefined : v); + this.fieldChange(field, v === '' ? undefined : (sanitize ? sanitize(v) : v)); }} placeholder={field.placeholder} rightElement={ @@ -252,6 +252,7 @@ export class AutoForm> extends React.Component str.toUpperCase().replace(/[^0-9PYMDTHS.,]/g, '')); case 'boolean': return this.renderBooleanInput(field); case 'string-array': return this.renderStringArrayInput(field); case 'json': return this.renderJSONInput(field); diff --git a/web-console/src/components/header-bar.tsx b/web-console/src/components/header-bar.tsx index 538f8a319fd..78f684cdf64 100644 --- a/web-console/src/components/header-bar.tsx +++ b/web-console/src/components/header-bar.tsx @@ -44,7 +44,6 @@ import { LEGACY_COORDINATOR_CONSOLE, LEGACY_OVERLORD_CONSOLE } from '../variables'; -import { LoadDataViewSeed } from '../views/load-data-view'; import './header-bar.scss'; @@ -53,7 +52,7 @@ export type HeaderActiveTab = null | 'load-data' | 'query' | 'datasources' | 'se export interface HeaderBarProps extends React.Props { active: HeaderActiveTab; hideLegacy: boolean; - goToLoadDataView: (loadDataViewSeed: LoadDataViewSeed) => void; + goToLoadDataView: () => void; } export interface HeaderBarState { @@ -160,15 +159,15 @@ export class HeaderBar extends React.Component { minimal={!loadDataPrimary} intent={loadDataPrimary ? Intent.PRIMARY : Intent.NONE} /> - + - + diff --git a/web-console/src/console-application.tsx b/web-console/src/console-application.tsx index b2c48925243..ce466725518 100644 --- a/web-console/src/console-application.tsx +++ b/web-console/src/console-application.tsx @@ -32,7 +32,7 @@ import { QueryManager } from './utils'; import { DRUID_DOCS_API, DRUID_DOCS_SQL } from './variables'; import { DatasourcesView } from './views/datasource-view'; import { HomeView } from './views/home-view'; -import { LoadDataView, LoadDataViewSeed } from './views/load-data-view'; +import { LoadDataView } from './views/load-data-view'; import { LookupsView } from './views/lookups-view'; import { SegmentsView } from './views/segments-view'; import { ServersView } from './views/servers-view'; @@ -100,8 +100,9 @@ export class ConsoleApplication extends React.Component { - this.loadDataViewSeed = null; + this.initSpec = null; this.taskId = null; + this.openDialog = null; this.datasource = null; this.onlyUnavailable = null; this.initSql = null; @@ -159,14 +161,15 @@ export class ConsoleApplication extends React.Component { - if (loadDataViewSeed) this.loadDataViewSeed = loadDataViewSeed; + private goToLoadDataView = (initSpec?: any) => { + if (initSpec) this.initSpec = initSpec; window.location.hash = 'load-data'; this.resetInitialsWithDelay(); } - private goToTask = (taskId: string | null) => { + private goToTask = (taskId: string | null, openDialog?: string) => { this.taskId = taskId; + if (openDialog) this.openDialog = openDialog; window.location.hash = 'tasks'; this.resetInitialsWithDelay(); } @@ -205,7 +208,7 @@ export class ConsoleApplication extends React.Component { - return this.wrapInViewContainer('load-data', , 'narrow-pad'); + return this.wrapInViewContainer('load-data', , 'narrow-pad'); } private wrappedSqlView = () => { @@ -224,7 +227,7 @@ export class ConsoleApplication extends React.Component { const { noSqlMode } = this.state; - return this.wrapInViewContainer('tasks', , 'scrollable'); + return this.wrapInViewContainer('tasks', , 'scrollable'); } private wrappedServersView = () => { diff --git a/web-console/src/utils/druid-query.ts b/web-console/src/utils/druid-query.ts index a9e80180568..9230d1f6932 100644 --- a/web-console/src/utils/druid-query.ts +++ b/web-console/src/utils/druid-query.ts @@ -27,6 +27,7 @@ export function parseHtmlError(htmlStr: string): string | null { return htmlStr .substring(startIndex + 10, endIndex) .replace(/"/g, '"') + .replace(/'/g, `'`) .replace(/>/g, '>'); } diff --git a/web-console/src/utils/general.tsx b/web-console/src/utils/general.tsx index 599ce1cd897..52aff78ba4a 100644 --- a/web-console/src/utils/general.tsx +++ b/web-console/src/utils/general.tsx @@ -207,6 +207,10 @@ export function parseStringToJSON(s: string): JSON | null { } } +export function selectDefined(xs: (Q | null | undefined)[]): Q[] { + return xs.filter(Boolean) as any; +} + export function filterMap(xs: T[], f: (x: T, i?: number) => Q | null | undefined): Q[] { return (xs.map(f) as any).filter(Boolean); } diff --git a/web-console/src/utils/ingestion-spec.tsx b/web-console/src/utils/ingestion-spec.tsx index f89dadb250e..9e506189478 100644 --- a/web-console/src/utils/ingestion-spec.tsx +++ b/web-console/src/utils/ingestion-spec.tsx @@ -113,6 +113,7 @@ export interface ParseSpec { } export function hasParallelAbility(spec: IngestionSpec): boolean { + const specType = getSpecType(spec); return spec.type === 'index' || spec.type === 'index_parallel'; } @@ -132,6 +133,10 @@ export function getRollup(spec: IngestionSpec): boolean { return typeof specRollup === 'boolean' ? specRollup : true; } +export function getSpecType(spec: IngestionSpec): IngestionType | undefined { + return deepGet(spec, 'type') || deepGet(spec, 'ioConfig.type') || deepGet(spec, 'tuningConfig.type'); +} + export function changeParallel(spec: IngestionSpec, parallel: boolean): IngestionSpec { if (!hasParallelAbility(spec)) return spec; const newType = parallel ? 'index_parallel' : 'index'; @@ -561,7 +566,7 @@ export interface IoConfig { period?: string; useEarliestOffset?: boolean; stream?: string; - region?: string; + endpoint?: string; useEarliestSequenceNumber?: boolean; } @@ -597,7 +602,7 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F name: 'firehose.uris', label: 'URIs', type: 'string-array', - placeholder: 'https://example.com/path/to/file.ext', + placeholder: 'https://example.com/path/to/file1.ext, https://example.com/path/to/file2.ext', info: <>

The full URI of your file. To ingest from multiple URIs, use commas to separate each individual URI.

@@ -636,7 +641,7 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F name: 'firehose.uris', label: 'S3 URIs', type: 'string-array', - placeholder: 's3://your-bucket/some-file.extension', + placeholder: 's3://your-bucket/some-file1.ext, s3://your-bucket/some-file2.ext', isDefined: (ioConfig) => !deepGet(ioConfig, 'firehose.prefixes'), info: <>

The full S3 URI of your file. To ingest from multiple URIs, use commas to separate each individual URI.

@@ -647,7 +652,7 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F name: 'firehose.prefixes', label: 'S3 prefixes', type: 'string-array', - placeholder: 's3://your-bucket/some-path', + placeholder: 's3://your-bucket/some-path1, s3://your-bucket/some-path2', isDefined: (ioConfig) => !deepGet(ioConfig, 'firehose.uris'), info: <>

A list of paths (with bucket) where your files are stored.

@@ -700,22 +705,62 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F return [ { name: 'stream', - type: 'string' + type: 'string', + placeholder: 'your-kinesis-stream', + info: <> + The Kinesis stream to read. + }, { - name: 'region', - type: 'string' + name: 'endpoint', + type: 'string', + defaultValue: 'kinesis.us-east-1.amazonaws.com', + suggestions: [ + 'kinesis.us-east-2.amazonaws.com', + 'kinesis.us-east-1.amazonaws.com', + 'kinesis.us-west-1.amazonaws.com', + 'kinesis.us-west-2.amazonaws.com', + 'kinesis.ap-east-1.amazonaws.com', + 'kinesis.ap-south-1.amazonaws.com', + 'kinesis.ap-northeast-3.amazonaws.com', + 'kinesis.ap-northeast-2.amazonaws.com', + 'kinesis.ap-southeast-1.amazonaws.com', + 'kinesis.ap-southeast-2.amazonaws.com', + 'kinesis.ap-northeast-1.amazonaws.com', + 'kinesis.ca-central-1.amazonaws.com', + 'kinesis.cn-north-1.amazonaws.com.com', + 'kinesis.cn-northwest-1.amazonaws.com.com', + 'kinesis.eu-central-1.amazonaws.com', + 'kinesis.eu-west-1.amazonaws.com', + 'kinesis.eu-west-2.amazonaws.com', + 'kinesis.eu-west-3.amazonaws.com', + 'kinesis.eu-north-1.amazonaws.com', + 'kinesis.sa-east-1.amazonaws.com', + 'kinesis.us-gov-east-1.amazonaws.com', + 'kinesis.us-gov-west-1.amazonaws.com' + ], + info: <> + The AWS Kinesis stream endpoint for a region. + You can find a list of endpoints here. + }, { - name: 'useEarliestOffset', - type: 'boolean', - defaultValue: true, - isDefined: (i: IoConfig) => i.type === 'kafka' || i.type === 'kinesis' + name: 'awsAssumedRoleArn', + label: 'AWS assumed role ARN', + type: 'string', + placeholder: 'optional', + info: <> + The AWS assumed role to use for additional permissions. + }, { - name: 'useEarliestSequenceNumber', - type: 'boolean', - isDefined: (i: IoConfig) => i.type === 'kinesis' + name: 'awsExternalId', + label: 'AWS external ID', + type: 'string', + placeholder: 'optional', + info: <> + The AWS external id to use for additional permissions. + } ]; } @@ -765,7 +810,7 @@ export function issueWithIoConfig(ioConfig: IoConfig | undefined): string | null break; case 'kinesis': - // if (!ioConfig.stream) return "must have a stream"; + if (!ioConfig.stream) return 'must have a stream'; break; } @@ -830,17 +875,174 @@ export function getIoConfigTuningFormFields(ingestionComboType: IngestionComboTy return []; case 'kafka': - return [ - // ToDo: fill this in - ]; - case 'kinesis': return [ - // ToDo: fill this in + { + name: 'useEarliestOffset', + type: 'boolean', + defaultValue: false, + isDefined: (i: IoConfig) => i.type === 'kafka', + info: <> +

+ If a supervisor is managing a dataSource for the first time, it will obtain a set of starting offsets from Kafka. + This flag determines whether it retrieves the earliest or latest offsets in Kafka. + Under normal circumstances, subsequent tasks will start from where the previous segments ended so this flag will only be used on first run. +

+ + }, + { + name: 'skipOffsetGaps', + type: 'boolean', + defaultValue: false, + isDefined: (i: IoConfig) => i.type === 'kafka', + info: <> +

+ Whether or not to allow gaps of missing offsets in the Kafka stream. + This is required for compatibility with implementations such as MapR Streams which does not guarantee consecutive offsets. + If this is false, an exception will be thrown if offsets are not consecutive. +

+ + }, + { + name: 'pollTimeout', + type: 'number', + defaultValue: 100, + isDefined: (i: IoConfig) => i.type === 'kafka', + info: <> +

The length of time to wait for the kafka consumer to poll records, in milliseconds.

+ + }, + + { + name: 'useEarliestSequenceNumber', + type: 'boolean', + defaultValue: false, + isDefined: (i: IoConfig) => i.type === 'kinesis', + info: <> + If a supervisor is managing a dataSource for the first time, it will obtain a set of starting sequence numbers from Kinesis. + This flag determines whether it retrieves the earliest or latest sequence numbers in Kinesis. + Under normal circumstances, subsequent tasks will start from where the previous segments ended so this flag will only be used on first run. + + }, + { + name: 'recordsPerFetch', + type: 'number', + defaultValue: 2000, + isDefined: (i: IoConfig) => i.type === 'kinesis', + info: <> + The number of records to request per GetRecords call to Kinesis. + + }, + { + name: 'fetchDelayMillis', + type: 'number', + defaultValue: 1000, + isDefined: (i: IoConfig) => i.type === 'kinesis', + info: <> + Time in milliseconds to wait between subsequent GetRecords calls to Kinesis. + + }, + { + name: 'deaggregate', + type: 'boolean', + isDefined: (i: IoConfig) => i.type === 'kinesis', + info: <> + Whether to use the de-aggregate function of the KCL. + + }, + + { + name: 'replicas', + type: 'number', + defaultValue: 1, + info: <> +

The number of replica sets, where 1 means a single set of tasks (no replication). Replica tasks will always be assigned to different workers to provide resiliency against process failure.

+ + }, + { + name: 'taskCount', + type: 'number', + defaultValue: 1, + info: <> +

+ The maximum number of reading tasks in a replica set. + This means that the maximum number of reading tasks will be taskCount * replicas and the total number of tasks (reading + publishing) will be higher than this. See 'Capacity Planning' below for more details. +

+ + }, + { + name: 'taskDuration', + type: 'duration', + defaultValue: 'PT1H', + info: <> +

+ The length of time before tasks stop reading and begin publishing their segment. +

+ + }, + { + name: 'startDelay', + type: 'duration', + defaultValue: 'PT5S', + info: <> +

+ The period to wait before the supervisor starts managing tasks. +

+ + }, + { + name: 'period', + type: 'duration', + defaultValue: 'PT30S', + info: <> +

+ How often the supervisor will execute its management logic. +

+

+ Note that the supervisor will also run in response to certain events (such as tasks succeeding, failing, and reaching their taskDuration) so this value specifies the maximum time between iterations. +

+ + }, + { + name: 'completionTimeout', + type: 'duration', + defaultValue: 'PT30M', + info: <> +

+ The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish. + The publishing clock for a task begins roughly after taskDuration elapses. +

+ + }, + { + name: 'lateMessageRejectionPeriod', + type: 'string', + placeholder: '(none)', + info: <> +

+ Configure tasks to reject messages with timestamps earlier than this period before the task was created; + for example if this is set to PT1H and the supervisor creates a task at 2016-01-01T12:00Z, messages with timestamps earlier than 2016-01-01T11:00Z will be dropped. +

+

+ This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). +

+ + }, + { + name: 'earlyMessageRejectionPeriod', + type: 'string', + placeholder: '(none)', + info: <> +

+ Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; + for example if this is set to PT1H, the taskDuration is set to PT1H and the supervisor creates a task at 2016-01-01T12:00Z, messages with timestamps later than 2016-01-01T14:00Z will be dropped. +

+ + } ]; } - throw new Error(`unknown input type ${ingestionComboType}`); + throw new Error(`unknown ingestion combo type ${ingestionComboType}`); } // --------------------------------------- @@ -906,9 +1108,116 @@ export interface TuningConfig { reportParseExceptions?: boolean; pushTimeout?: number; segmentWriteOutMediumFactory?: any; - // ... + intermediateHandoffPeriod?: string; + handoffConditionTimeout?: number; + resetOffsetAutomatically?: boolean; + workerThreads?: number; + chatThreads?: number; + chatRetries?: number; + httpTimeout?: string; + shutdownTimeout?: string; + offsetFetchPeriod?: string; maxParseExceptions?: number; maxSavedParseExceptions?: number; + recordBufferSize?: number; + recordBufferOfferTimeout?: number; + recordBufferFullWait?: number; + fetchSequenceNumberTimeout?: number; + fetchThreads?: number; +} + +export function getPartitionRelatedTuningSpecFormFields(specType: IngestionType): Field[] { + switch (specType) { + case 'index': + case 'index_parallel': + const myIsParallel = specType === 'index_parallel'; + return [ + { + name: 'partitionDimensions', + type: 'string-array', + disabled: myIsParallel, + info: <> +

+ Does not currently work with parallel ingestion +

+

+ The dimensions to partition on. + Leave blank to select all dimensions. Only used with forceGuaranteedRollup = true, will be ignored otherwise. +

+ + }, + { + name: 'forceGuaranteedRollup', + type: 'boolean', + disabled: myIsParallel, + info: <> +

+ Does not currently work with parallel ingestion +

+

+ Forces guaranteeing the perfect rollup. + The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. + If this is set to true, the index task will read the entire input data twice: one for finding the optimal number of partitions per time chunk and one for generating segments. +

+ + }, + { + name: 'targetPartitionSize', + type: 'number', + info: <> + Target number of rows to include in a partition, should be a number that targets segments of 500MB~1GB. + + }, + { + name: 'numShards', + type: 'number', + info: <> + Directly specify the number of shards to create. + If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set. + + }, + { + name: 'maxRowsPerSegment', + type: 'number', + defaultValue: 5000000, + info: <> + Determines how many rows are in each segment. + + }, + { + name: 'maxTotalRows', + type: 'number', + defaultValue: 20000000, + info: <> + Total number of rows in segments waiting for being pushed. + + } + ]; + + case 'kafka': + case 'kinesis': + return [ + { + name: 'maxRowsPerSegment', + type: 'number', + defaultValue: 5000000, + info: <> + Determines how many rows are in each segment. + + }, + { + name: 'maxTotalRows', + type: 'number', + defaultValue: 20000000, + info: <> + Total number of rows in segments waiting for being pushed. + + } + ]; + + } + + throw new Error(`unknown spec type ${specType}`); } const TUNING_CONFIG_FORM_FIELDS: Field[] = [ @@ -928,18 +1237,37 @@ const TUNING_CONFIG_FORM_FIELDS: Field[] = [ Used in determining when intermediate persists to disk should occur. }, + { + name: 'intermediatePersistPeriod', + type: 'duration', + defaultValue: 'PT10M', + isDefined: (t: TuningConfig) => t.type === 'kafka' || t.type === 'kinesis', + info: <> + The period that determines the rate at which intermediate persists occur. + + }, + { + name: 'intermediateHandoffPeriod', + type: 'duration', + defaultValue: 'P2147483647D', + isDefined: (t: TuningConfig) => t.type === 'kafka' || t.type === 'kinesis', + info: <> + How often the tasks should hand off segments. + Handoff will happen either if maxRowsPerSegment or maxTotalRows is hit or every intermediateHandoffPeriod, whichever happens earlier. + + }, { name: 'maxPendingPersists', - type: 'number' + type: 'number', + info: <> + Maximum number of persists that can be pending but not started. + If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. + }, { name: 'forceExtendableShardSpecs', type: 'boolean' }, - { - name: 'reportParseExceptions', - type: 'boolean' - }, { name: 'pushTimeout', type: 'number', @@ -978,7 +1306,7 @@ const TUNING_CONFIG_FORM_FIELDS: Field[] = [ }, { name: 'chatHandlerTimeout', - type: 'string', + type: 'duration', defaultValue: 'PT10S', info: <> Timeout for reporting the pushed segments in worker tasks. @@ -991,6 +1319,136 @@ const TUNING_CONFIG_FORM_FIELDS: Field[] = [ info: <> Retries for reporting the pushed segments in worker tasks. + }, + { + name: 'handoffConditionTimeout', + type: 'number', + defaultValue: 0, + isDefined: (t: TuningConfig) => t.type === 'kafka' || t.type === 'kinesis', + info: <> + Milliseconds to wait for segment handoff. + 0 means to wait forever. + + }, + { + name: 'resetOffsetAutomatically', + type: 'boolean', + defaultValue: false, + isDefined: (t: TuningConfig) => t.type === 'kafka' || t.type === 'kinesis', + info: <> + Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition. + + }, + { + name: 'workerThreads', + type: 'number', + placeholder: 'min(10, taskCount)', + isDefined: (t: TuningConfig) => t.type === 'kafka' || t.type === 'kinesis', + info: <> + The number of threads that will be used by the supervisor for asynchronous operations. + + }, + { + name: 'chatThreads', + type: 'number', + placeholder: 'min(10, taskCount * replicas)', + isDefined: (t: TuningConfig) => t.type === 'kafka' || t.type === 'kinesis', + info: <> + The number of threads that will be used for communicating with indexing tasks. + + }, + { + name: 'chatRetries', + type: 'number', + defaultValue: 8, + isDefined: (t: TuningConfig) => t.type === 'kafka' || t.type === 'kinesis', + info: <> + The number of times HTTP requests to indexing tasks will be retried before considering tasks unresponsive. + + }, + { + name: 'httpTimeout', + type: 'duration', + defaultValue: 'PT10S', + isDefined: (t: TuningConfig) => t.type === 'kafka' || t.type === 'kinesis', + info: <> + How long to wait for a HTTP response from an indexing task. + + }, + { + name: 'shutdownTimeout', + type: 'duration', + defaultValue: 'PT80S', + isDefined: (t: TuningConfig) => t.type === 'kafka' || t.type === 'kinesis', + info: <> + How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting. + + }, + { + name: 'offsetFetchPeriod', + type: 'duration', + defaultValue: 'PT30S', + isDefined: (t: TuningConfig) => t.type === 'kafka', + info: <> + How often the supervisor queries Kafka and the indexing tasks to fetch current offsets and calculate lag. + + }, + { + name: 'recordBufferSize', + type: 'number', + defaultValue: 10000, + isDefined: (t: TuningConfig) => t.type === 'kinesis', + info: <> + Size of the buffer (number of events) used between the Kinesis fetch threads and the main ingestion thread. + + }, + { + name: 'recordBufferOfferTimeout', + type: 'number', + defaultValue: 5000, + isDefined: (t: TuningConfig) => t.type === 'kinesis', + info: <> + Length of time in milliseconds to wait for space to become available in the buffer before timing out. + + }, + { + name: 'recordBufferFullWait', + type: 'number', + defaultValue: 5000, + isDefined: (t: TuningConfig) => t.type === 'kinesis', + info: <> + Length of time in milliseconds to wait for the buffer to drain before attempting to fetch records from Kinesis again. + + }, + { + name: 'fetchSequenceNumberTimeout', + type: 'number', + defaultValue: 60000, + isDefined: (t: TuningConfig) => t.type === 'kinesis', + info: <> + Length of time in milliseconds to wait for Kinesis to return the earliest or latest sequence number for a shard. Kinesis will not return the latest sequence number if no data is actively being written to that shard. + In this case, this fetch call will repeatedly timeout and retry until fresh data is written to the stream. + + }, + { + name: 'fetchThreads', + type: 'number', + placeholder: 'max(1, {numProcessors} - 1)', + isDefined: (t: TuningConfig) => t.type === 'kinesis', + info: <> + Size of the pool of threads fetching data from Kinesis. + There is no benefit in having more threads than Kinesis shards. + + }, + { + name: 'maxRecordsPerPoll', + type: 'number', + defaultValue: 100, + isDefined: (t: TuningConfig) => t.type === 'kinesis', + info: <> + The maximum number of records/events to be fetched from buffer per poll. + The actual maximum will be max(maxRecordsPerPoll, max(bufferSize, 1)). + } ]; @@ -1012,12 +1470,14 @@ export interface Bitmap { // -------------- -export function getBlankSpec(ingestionType: IngestionType = 'index', firehoseType: string | null = null): IngestionSpec { - const ioAndTuningConfigType = ingestionTypeToIoAndTuningConfigType(ingestionType); +export function getBlankSpec(comboType: IngestionComboType): IngestionSpec { + let [ingestionType, firehoseType] = comboType.split(':'); + if (ingestionType === 'index') ingestionType = 'index_parallel'; + const ioAndTuningConfigType = ingestionTypeToIoAndTuningConfigType(ingestionType as IngestionType); const granularitySpec: GranularitySpec = { type: 'uniform', - segmentGranularity: ['index', 'index_parallel'].includes(ingestionType) ? 'DAY' : 'HOUR', + segmentGranularity: ingestionType === 'index_parallel' ? 'DAY' : 'HOUR', queryGranularity: 'HOUR' }; diff --git a/web-console/src/utils/sampler.ts b/web-console/src/utils/sampler.ts index 85b1844b447..4a3f7fcc57f 100644 --- a/web-console/src/utils/sampler.ts +++ b/web-console/src/utils/sampler.ts @@ -22,7 +22,7 @@ import { getDruidErrorMessage } from './druid-query'; import { filterMap, sortWithPrefixSuffix } from './general'; import { DimensionsSpec, - getEmptyTimestampSpec, + getEmptyTimestampSpec, getSpecType, IngestionSpec, IoConfig, MetricSpec, Parser, @@ -35,17 +35,19 @@ import { QueryState } from './query-state'; const SAMPLER_URL = `/druid/indexer/v1/sampler`; const BASE_SAMPLER_CONFIG: SamplerConfig = { // skipCache: true, - numRows: 500 + numRows: 500, + timeoutMs: 15000 }; export interface SampleSpec { - type: 'index'; + type: string; spec: IngestionSpec; samplerConfig: SamplerConfig; } export interface SamplerConfig { numRows?: number; + timeoutMs?: number; cacheKey?: string; skipCache?: boolean; } @@ -79,6 +81,14 @@ function dedupe(xs: string[]): string[] { }); } +type SamplerType = 'index' | 'kafka' | 'kinesis'; + +export function getSamplerType(spec: IngestionSpec): SamplerType { + const specType = getSpecType(spec); + if (specType === 'kafka' || specType === 'kinesis') return specType; + return 'index'; +} + export function headerFromSampleResponse(sampleResponse: SampleResponse, ignoreColumn?: string): string[] { let columns = sortWithPrefixSuffix(dedupe( [].concat(...(filterMap(sampleResponse.data, s => s.parsed ? Object.keys(s.parsed) : null) as any)) @@ -98,6 +108,17 @@ export function headerAndRowsFromSampleResponse(sampleResponse: SampleResponse, }; } +export async function getOverlordModules(): Promise { + let statusResp: any; + try { + statusResp = await axios.get(`/proxy/overlord/status`); + } catch (e) { + throw new Error(getDruidErrorMessage(e)); + } + + return statusResp.data.modules.map((m: any) => m.artifact); +} + async function postToSampler(sampleSpec: SampleSpec, forStr: string): Promise { let sampleResp: any; try { @@ -109,24 +130,40 @@ async function postToSampler(sampleSpec: SampleSpec, forStr: string): Promise { - const ioConfig: IoConfig = deepGet(spec, 'ioConfig') || {}; +export type SampleStrategy = 'start' | 'end'; + +function makeSamplerIoConfig(ioConfig: IoConfig, samplerType: SamplerType, sampleStrategy: SampleStrategy): IoConfig { + ioConfig = deepSet(ioConfig || {}, 'type', samplerType); + if (samplerType === 'kafka') { + ioConfig = deepSet(ioConfig, 'useEarliestOffset', sampleStrategy === 'start'); + } else if (samplerType === 'kinesis') { + ioConfig = deepSet(ioConfig, 'useEarliestSequenceNumber', sampleStrategy === 'start'); + } + return ioConfig; +} + +export async function sampleForConnect(spec: IngestionSpec, sampleStrategy: SampleStrategy): Promise { + const samplerType = getSamplerType(spec); + const ioConfig: IoConfig = makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy); const sampleSpec: SampleSpec = { - type: 'index', + type: samplerType, spec: { - ioConfig: deepSet(ioConfig, 'type', 'index') - // dataSchema: { - // dataSource: 'sample', - // parser: { - // type: 'string', - // parseSpec: { - // format: 'json', - // dimensionsSpec: {}, - // timestampSpec: getEmptyTimestampSpec() - // } - // } - // } + type: samplerType, + ioConfig, + dataSchema: { + dataSource: 'sample', + parser: { + type: 'string', + parseSpec: { + format: 'regex', + pattern: '(.*)', + columns: ['a'], + dimensionsSpec: {}, + timestampSpec: getEmptyTimestampSpec() + } + } + } } as any, samplerConfig: BASE_SAMPLER_CONFIG }; @@ -134,14 +171,16 @@ export async function sampleForConnect(spec: IngestionSpec): Promise { - const ioConfig: IoConfig = deepGet(spec, 'ioConfig') || {}; +export async function sampleForParser(spec: IngestionSpec, sampleStrategy: SampleStrategy, cacheKey: string | undefined): Promise { + const samplerType = getSamplerType(spec); + const ioConfig: IoConfig = makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy); const parser: Parser = deepGet(spec, 'dataSchema.parser') || {}; const sampleSpec: SampleSpec = { - type: 'index', + type: samplerType, spec: { - ioConfig: deepSet(ioConfig, 'type', 'index'), + type: samplerType, + ioConfig: deepSet(ioConfig, 'type', samplerType), dataSchema: { dataSource: 'sample', parser: { @@ -165,15 +204,17 @@ export async function sampleForParser(spec: IngestionSpec, cacheKey: string | un return postToSampler(sampleSpec, 'parser'); } -export async function sampleForTimestamp(spec: IngestionSpec, cacheKey: string | undefined): Promise { - const ioConfig: IoConfig = deepGet(spec, 'ioConfig') || {}; +export async function sampleForTimestamp(spec: IngestionSpec, sampleStrategy: SampleStrategy, cacheKey: string | undefined): Promise { + const samplerType = getSamplerType(spec); + const ioConfig: IoConfig = makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy); const parser: Parser = deepGet(spec, 'dataSchema.parser') || {}; const parseSpec: ParseSpec = deepGet(spec, 'dataSchema.parser.parseSpec') || {}; const sampleSpec: SampleSpec = { - type: 'index', + type: samplerType, spec: { - ioConfig: deepSet(ioConfig, 'type', 'index'), + type: samplerType, + ioConfig: deepSet(ioConfig, 'type', samplerType), dataSchema: { dataSource: 'sample', parser: { @@ -192,8 +233,9 @@ export async function sampleForTimestamp(spec: IngestionSpec, cacheKey: string | return postToSampler(sampleSpec, 'timestamp'); } -export async function sampleForTransform(spec: IngestionSpec, cacheKey: string | undefined): Promise { - const ioConfig: IoConfig = deepGet(spec, 'ioConfig') || {}; +export async function sampleForTransform(spec: IngestionSpec, sampleStrategy: SampleStrategy, cacheKey: string | undefined): Promise { + const samplerType = getSamplerType(spec); + const ioConfig: IoConfig = makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy); const parser: Parser = deepGet(spec, 'dataSchema.parser') || {}; const parseSpec: ParseSpec = deepGet(spec, 'dataSchema.parser.parseSpec') || {}; const transforms: Transform[] = deepGet(spec, 'dataSchema.transformSpec.transforms') || []; @@ -203,9 +245,10 @@ export async function sampleForTransform(spec: IngestionSpec, cacheKey: string | if (transforms && transforms.length) { const sampleSpecHack: SampleSpec = { - type: 'index', + type: samplerType, spec: { - ioConfig: deepSet(ioConfig, 'type', 'index'), + type: samplerType, + ioConfig: deepSet(ioConfig, 'type', samplerType), dataSchema: { dataSource: 'sample', parser: { @@ -227,9 +270,10 @@ export async function sampleForTransform(spec: IngestionSpec, cacheKey: string | } const sampleSpec: SampleSpec = { - type: 'index', + type: samplerType, spec: { - ioConfig: deepSet(ioConfig, 'type', 'index'), + type: samplerType, + ioConfig: deepSet(ioConfig, 'type', samplerType), dataSchema: { dataSource: 'sample', parser: { @@ -251,8 +295,9 @@ export async function sampleForTransform(spec: IngestionSpec, cacheKey: string | return postToSampler(sampleSpec, 'transform'); } -export async function sampleForFilter(spec: IngestionSpec, cacheKey: string | undefined): Promise { - const ioConfig: IoConfig = deepGet(spec, 'ioConfig') || {}; +export async function sampleForFilter(spec: IngestionSpec, sampleStrategy: SampleStrategy, cacheKey: string | undefined): Promise { + const samplerType = getSamplerType(spec); + const ioConfig: IoConfig = makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy); const parser: Parser = deepGet(spec, 'dataSchema.parser') || {}; const parseSpec: ParseSpec = deepGet(spec, 'dataSchema.parser.parseSpec') || {}; const transforms: Transform[] = deepGet(spec, 'dataSchema.transformSpec.transforms') || []; @@ -263,9 +308,10 @@ export async function sampleForFilter(spec: IngestionSpec, cacheKey: string | un if (transforms && transforms.length) { const sampleSpecHack: SampleSpec = { - type: 'index', + type: samplerType, spec: { - ioConfig: deepSet(ioConfig, 'type', 'index'), + type: samplerType, + ioConfig: deepSet(ioConfig, 'type', samplerType), dataSchema: { dataSource: 'sample', parser: { @@ -287,9 +333,10 @@ export async function sampleForFilter(spec: IngestionSpec, cacheKey: string | un } const sampleSpec: SampleSpec = { - type: 'index', + type: samplerType, spec: { - ioConfig: deepSet(ioConfig, 'type', 'index'), + type: samplerType, + ioConfig: deepSet(ioConfig, 'type', samplerType), dataSchema: { dataSource: 'sample', parser: { @@ -312,17 +359,19 @@ export async function sampleForFilter(spec: IngestionSpec, cacheKey: string | un return postToSampler(sampleSpec, 'filter'); } -export async function sampleForSchema(spec: IngestionSpec, cacheKey: string | undefined): Promise { - const ioConfig: IoConfig = deepGet(spec, 'ioConfig') || {}; +export async function sampleForSchema(spec: IngestionSpec, sampleStrategy: SampleStrategy, cacheKey: string | undefined): Promise { + const samplerType = getSamplerType(spec); + const ioConfig: IoConfig = makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy); const parser: Parser = deepGet(spec, 'dataSchema.parser') || {}; const transformSpec: TransformSpec = deepGet(spec, 'dataSchema.transformSpec') || ({} as TransformSpec); const metricsSpec: MetricSpec[] = deepGet(spec, 'dataSchema.metricsSpec') || []; const queryGranularity: string = deepGet(spec, 'dataSchema.granularitySpec.queryGranularity') || 'NONE'; const sampleSpec: SampleSpec = { - type: 'index', + type: samplerType, spec: { - ioConfig: deepSet(ioConfig, 'type', 'index'), + type: samplerType, + ioConfig: deepSet(ioConfig, 'type', samplerType), dataSchema: { dataSource: 'sample', parser: whitelistKeys(parser, ['type', 'parseSpec']) as Parser, diff --git a/web-console/src/views/datasource-view.tsx b/web-console/src/views/datasource-view.tsx index 1b4b1e319fd..9139e576fbe 100644 --- a/web-console/src/views/datasource-view.tsx +++ b/web-console/src/views/datasource-view.tsx @@ -402,6 +402,8 @@ GROUP BY 1`); } getDatasourceActions(datasource: string, disabled: boolean): BasicAction[] { + const { goToSql } = this.props; + if (disabled) { return [ { @@ -418,6 +420,11 @@ GROUP BY 1`); ]; } else { return [ + { + icon: IconNames.APPLICATION, + title: 'Query with SQL', + onAction: () => goToSql(`SELECT * FROM "${datasource}"`) + }, { icon: IconNames.EXPORT, title: 'Reload data by interval', diff --git a/web-console/src/views/load-data-view.scss b/web-console/src/views/load-data-view.scss index 46de72971c0..64ac83e1d5a 100644 --- a/web-console/src/views/load-data-view.scss +++ b/web-console/src/views/load-data-view.scss @@ -38,14 +38,10 @@ font-size: 20px; } - .section-title { - margin-bottom: 10px; - font-weight: bold; - } - .cards { .bp3-card { display: inline-block; + vertical-align: top; width: 250px; height: 140px; margin-right: 15px; @@ -53,6 +49,10 @@ font-size: 24px; text-align: center; padding-top: 47px; + + &.disabled { + opacity: 0.4; + } } } } diff --git a/web-console/src/views/load-data-view.tsx b/web-console/src/views/load-data-view.tsx index 22a0dbb6ab7..e69bf262f8a 100644 --- a/web-console/src/views/load-data-view.tsx +++ b/web-console/src/views/load-data-view.tsx @@ -22,7 +22,7 @@ import { Button, ButtonGroup, Callout, Card, Classes, Code, - FormGroup, H5, + FormGroup, H5, HTMLSelect, Icon, Intent, Popover, Switch, TextArea } from '@blueprintjs/core'; import { IconNames } from '@blueprintjs/icons'; @@ -54,24 +54,58 @@ import { escapeColumnName } from '../utils/druid-expression'; import { possibleDruidFormatForValues } from '../utils/druid-time'; import { updateSchemaWithSample } from '../utils/druid-type'; import { - changeParallel, DimensionMode, - DimensionSpec, DimensionsSpec, DruidFilter, + changeParallel, + DimensionMode, + DimensionSpec, + DimensionsSpec, + DruidFilter, fillDataSourceName, fillParser, - FlattenField, getBlankSpec, getDimensionMode, + FlattenField, + getBlankSpec, + getDimensionMode, getDimensionSpecFormFields, - getDimensionSpecName, getDimensionSpecType, getEmptyTimestampSpec, getFilterFormFields, getFlattenFieldFormFields, - getIngestionComboType, getIoConfigFormFields, getIoConfigTuningFormFields, getMetricSpecFormFields, - getMetricSpecName, getParseSpecFormFields, getRollup, getTimestampSpecColumn, getTimestampSpecFormFields, + getDimensionSpecName, + getDimensionSpecType, + getEmptyTimestampSpec, + getFilterFormFields, + getFlattenFieldFormFields, + getIngestionComboType, + getIoConfigFormFields, + getIoConfigTuningFormFields, + getMetricSpecFormFields, + getMetricSpecName, + getParseSpecFormFields, + getPartitionRelatedTuningSpecFormFields, + getRollup, + getSpecType, + getTimestampSpecColumn, + getTimestampSpecFormFields, getTransformFormFields, - getTuningSpecFormFields, GranularitySpec, hasParallelAbility, inflateDimensionSpec, IngestionSpec, - IngestionType, IoConfig, - isColumnTimestampSpec, isParallel, issueWithIoConfig, issueWithParser, joinFilter, - MetricSpec, Parser, ParseSpec, - parseSpecHasFlatten, splitFilter, TimestampSpec, Transform, TuningConfig + getTuningSpecFormFields, + GranularitySpec, + hasParallelAbility, + inflateDimensionSpec, IngestionComboType, + IngestionSpec, + IngestionType, + IoConfig, + isColumnTimestampSpec, + isParallel, + issueWithIoConfig, + issueWithParser, + joinFilter, + MetricSpec, + Parser, + ParseSpec, + parseSpecHasFlatten, + splitFilter, + TimestampSpec, + Transform, + TuningConfig } from '../utils/ingestion-spec'; import { deepDelete, deepGet, deepSet } from '../utils/object-change'; import { + getOverlordModules, HeaderAndRows, headerAndRowsFromSampleResponse, SampleEntry, @@ -80,16 +114,20 @@ import { sampleForParser, sampleForSchema, sampleForTimestamp, sampleForTransform, - SampleResponse + SampleResponse, SampleStrategy } from '../utils/sampler'; import { computeFlattenPathsForData } from '../utils/spec-utils'; import './load-data-view.scss'; -export interface LoadDataViewSeed { - type?: IngestionType; - firehoseType?: string; - initSpec?: IngestionSpec; +function showRawLine(line: string): string { + if (line.includes('\n')) { + return ``; + } + if (line.length > 1000) { + return line.substr(0, 1000) + '...'; + } + return line; } function filterMatch(testString: string, searchString: string): boolean { @@ -136,8 +174,8 @@ const VIEW_TITLE: Record = { }; export interface LoadDataViewProps extends React.Props { - seed: LoadDataViewSeed | null; - goToTask: (taskId: string | null) => void; + initSpec: IngestionSpec | null; + goToTask: (taskId: string | null, openDialog?: string) => void; } export interface LoadDataViewState { @@ -151,6 +189,9 @@ export interface LoadDataViewState { newDimensionMode: DimensionMode | null; // general + overlordModules: string[] | null; + overlordModuleNeededMessage: string | null; + sampleStrategy: SampleStrategy; columnFilter: string; specialColumnsOnly: boolean; @@ -191,7 +232,7 @@ export class LoadDataView extends React.Component { this.doQueryForStage(newStage); this.setState({ stage: newStage }); @@ -360,48 +421,70 @@ export class LoadDataView extends React.Component { this.updateStage('connect'); }, 10); } + renderIngestionCard(title: string, comboType: IngestionComboType, requiredModule?: string) { + const { overlordModules } = this.state; + if (!overlordModules) return null; + const goodToGo = !requiredModule || overlordModules.includes(requiredModule); + + return { + if (goodToGo) { + this.initWith(comboType); + } else { + this.setState({ + overlordModuleNeededMessage: `${title} ingestion requires the '${requiredModule}' to be loaded.` + }); + } + }} + > + {title} + ; + } + renderInitStage() { - const showStreaming = false; + const { goToTask } = this.props; + const { overlordModuleNeededMessage } = this.state; return <>
Please specify where your raw data is located
- - Welcome to the Druid data loader. - This project is under active development and we plan to support many other sources of raw data, including stream hubs such as Apache Kafka and AWS Kinesis, in the next few releases. - - - { - showStreaming && -
-
Stream hub
-
- this.initWith({ type: 'kafka' })}>Apache Kafka - this.initWith({ type: 'kinesis' })}>AWS Kinesis -
-
- } - -
-
Batch load
-
- this.initWith({ type: 'index_parallel', firehoseType: 'http' })}>HTTP(s) - this.initWith({ type: 'index_parallel', firehoseType: 'static-s3' })}>AWS S3 - this.initWith({ type: 'index_parallel', firehoseType: 'static-google-blobstore' })}>Google Blobstore - this.initWith({ type: 'index_parallel', firehoseType: 'local' })}>Local disk -
+
+ {this.renderIngestionCard('Apache Kafka', 'kafka', 'druid-kafka-indexing-service')} + {this.renderIngestionCard('AWS Kinesis', 'kinesis', 'druid-kinesis-indexing-service')} + {this.renderIngestionCard('HTTP(s)', 'index:http')} + {this.renderIngestionCard('AWS S3', 'index:static-s3', 'druid-s3-extensions')} + {this.renderIngestionCard('Google Cloud Storage', 'index:static-google-blobstore', 'druid-google-extensions')} + {this.renderIngestionCard('Local disk', 'index:local')} + goToTask(null, 'supervisor')}> + Other (streaming) + + goToTask(null, 'task')}> + Other (batch) +
+ + this.setState({ overlordModuleNeededMessage: null })} + > +

{overlordModuleNeededMessage}

+
; } @@ -430,7 +513,7 @@ export class LoadDataView extends React.Component !l) ? inputData.map(_ => '[Binary data]') : inputData).join('\n')} + value={ + inputData.length ? + (inputData.every(l => !l) ? inputData.map(_ => '') : inputData.map(showRawLine)).join('\n') : + 'No data returned from sampler' + } readOnly />; } @@ -531,6 +619,15 @@ export class LoadDataView extends React.Component } + { + (specType === 'kafka' || specType === 'kinesis') && + + this.setState({ sampleStrategy: e.target.value as any })}> + + + + + }