From 0cc998d8a1dad781cf509a739229cc07fbf74641 Mon Sep 17 00:00:00 2001 From: Vadim Ogievetsky Date: Wed, 15 Dec 2021 10:28:21 -0800 Subject: [PATCH] improve spec upgrading (#12072) --- .../__snapshots__/ingestion-spec.spec.ts.snap | 77 ----- .../src/druid-models/ingestion-spec.spec.ts | 288 ++++++++++++++---- .../src/druid-models/ingestion-spec.tsx | 64 ++-- 3 files changed, 257 insertions(+), 172 deletions(-) delete mode 100644 web-console/src/druid-models/__snapshots__/ingestion-spec.spec.ts.snap diff --git a/web-console/src/druid-models/__snapshots__/ingestion-spec.spec.ts.snap b/web-console/src/druid-models/__snapshots__/ingestion-spec.spec.ts.snap deleted file mode 100644 index be836323f12..00000000000 --- a/web-console/src/druid-models/__snapshots__/ingestion-spec.spec.ts.snap +++ /dev/null @@ -1,77 +0,0 @@ -// Jest Snapshot v1, https://goo.gl/fbAQLP - -exports[`ingestion-spec upgrades 1`] = ` -Object { - "spec": Object { - "dataSchema": Object { - "dataSource": "wikipedia", - "dimensionsSpec": Object { - "dimensions": Array [ - "channel", - "cityName", - "comment", - ], - }, - "granularitySpec": Object { - "queryGranularity": "hour", - "rollup": true, - "segmentGranularity": "day", - }, - "metricsSpec": Array [ - Object { - "name": "count", - "type": "count", - }, - Object { - "fieldName": "added", - "name": "sum_added", - "type": "longSum", - }, - ], - "timestampSpec": Object { - "column": "timestamp", - "format": "iso", - }, - "transformSpec": Object { - "filter": Object { - "dimension": "commentLength", - "type": "selector", - "value": "35", - }, - "transforms": Array [ - Object { - "expression": "concat(\\"channel\\", 'lol')", - "name": "channel", - "type": "expression", - }, - ], - }, - }, - "ioConfig": Object { - "inputFormat": Object { - "flattenSpec": Object { - "fields": Array [ - Object { - "expr": "$.cityName", - "name": "cityNameAlt", - "type": "path", - }, - ], - }, - "type": "json", - }, - "inputSource": Object { - "type": "http", - "uris": Array [ - "https://static.imply.io/data/wikipedia.json.gz", - ], - }, - "type": "index_parallel", - }, - "tuningConfig": Object { - "type": "index_parallel", - }, - }, - "type": "index_parallel", -} -`; diff --git a/web-console/src/druid-models/ingestion-spec.spec.ts b/web-console/src/druid-models/ingestion-spec.spec.ts index 8a7b6bb5bf0..b76a6f985c6 100644 --- a/web-console/src/druid-models/ingestion-spec.spec.ts +++ b/web-console/src/druid-models/ingestion-spec.spec.ts @@ -19,7 +19,6 @@ import { adjustId, cleanSpec, - downgradeSpec, getColumnTypeFromHeaderAndRows, guessInputFormat, guessTypeFromSample, @@ -29,60 +28,164 @@ import { } from './ingestion-spec'; describe('ingestion-spec', () => { - const oldSpec = { - type: 'index_parallel', - spec: { - ioConfig: { - type: 'index_parallel', - firehose: { - type: 'http', - uris: ['https://static.imply.io/data/wikipedia.json.gz'], + it('upgrades / downgrades task spec', () => { + const oldTaskSpec = { + type: 'index_parallel', + spec: { + ioConfig: { + type: 'index_parallel', + firehose: { + type: 'http', + uris: ['https://static.imply.io/data/wikipedia.json.gz'], + }, + }, + tuningConfig: { + type: 'index_parallel', + }, + dataSchema: { + dataSource: 'wikipedia', + granularitySpec: { + segmentGranularity: 'day', + queryGranularity: 'hour', + rollup: true, + }, + parser: { + type: 'string', + parseSpec: { + format: 'json', + timestampSpec: { + column: 'timestamp', + format: 'iso', + }, + dimensionsSpec: { + dimensions: ['channel', 'cityName', 'comment'], + }, + flattenSpec: { + fields: [ + { + type: 'path', + name: 'cityNameAlt', + expr: '$.cityName', + }, + ], + }, + }, + }, + transformSpec: { + transforms: [ + { + type: 'expression', + name: 'channel', + expression: 'concat("channel", \'lol\')', + }, + ], + filter: { + type: 'selector', + dimension: 'commentLength', + value: '35', + }, + }, + metricsSpec: [ + { + name: 'count', + type: 'count', + }, + { + name: 'sum_added', + type: 'longSum', + fieldName: 'added', + }, + ], }, }, - tuningConfig: { - type: 'index_parallel', + }; + + expect(upgradeSpec(oldTaskSpec)).toEqual({ + spec: { + dataSchema: { + dataSource: 'wikipedia', + dimensionsSpec: { + dimensions: ['channel', 'cityName', 'comment'], + }, + granularitySpec: { + queryGranularity: 'hour', + rollup: true, + segmentGranularity: 'day', + }, + metricsSpec: [ + { + name: 'count', + type: 'count', + }, + { + fieldName: 'added', + name: 'sum_added', + type: 'longSum', + }, + ], + timestampSpec: { + column: 'timestamp', + format: 'iso', + }, + transformSpec: { + filter: { + dimension: 'commentLength', + type: 'selector', + value: '35', + }, + transforms: [ + { + expression: 'concat("channel", \'lol\')', + name: 'channel', + type: 'expression', + }, + ], + }, + }, + ioConfig: { + inputFormat: { + flattenSpec: { + fields: [ + { + expr: '$.cityName', + name: 'cityNameAlt', + type: 'path', + }, + ], + }, + type: 'json', + }, + inputSource: { + type: 'http', + uris: ['https://static.imply.io/data/wikipedia.json.gz'], + }, + type: 'index_parallel', + }, + tuningConfig: { + type: 'index_parallel', + }, }, + type: 'index_parallel', + }); + }); + + it('upgrades / downgrades supervisor spec', () => { + const oldSupervisorSpec = { + type: 'kafka', dataSchema: { - dataSource: 'wikipedia', - granularitySpec: { - segmentGranularity: 'day', - queryGranularity: 'hour', - rollup: true, - }, + dataSource: 'metrics-kafka', parser: { type: 'string', parseSpec: { format: 'json', timestampSpec: { column: 'timestamp', - format: 'iso', + format: 'auto', }, dimensionsSpec: { - dimensions: ['channel', 'cityName', 'comment'], + dimensions: [], + dimensionExclusions: ['timestamp', 'value'], }, - flattenSpec: { - fields: [ - { - type: 'path', - name: 'cityNameAlt', - expr: '$.cityName', - }, - ], - }, - }, - }, - transformSpec: { - transforms: [ - { - type: 'expression', - name: 'channel', - expression: 'concat("channel", \'lol\')', - }, - ], - filter: { - type: 'selector', - dimension: 'commentLength', - value: '35', }, }, metricsSpec: [ @@ -91,21 +194,100 @@ describe('ingestion-spec', () => { type: 'count', }, { - name: 'sum_added', - type: 'longSum', - fieldName: 'added', + name: 'value_sum', + fieldName: 'value', + type: 'doubleSum', + }, + { + name: 'value_min', + fieldName: 'value', + type: 'doubleMin', + }, + { + name: 'value_max', + fieldName: 'value', + type: 'doubleMax', }, ], + granularitySpec: { + type: 'uniform', + segmentGranularity: 'HOUR', + queryGranularity: 'NONE', + }, }, - }, - }; + tuningConfig: { + type: 'kafka', + maxRowsPerSegment: 5000000, + }, + ioConfig: { + topic: 'metrics', + consumerProperties: { + 'bootstrap.servers': 'localhost:9092', + }, + taskCount: 1, + replicas: 1, + taskDuration: 'PT1H', + }, + }; - it('upgrades', () => { - expect(upgradeSpec(oldSpec)).toMatchSnapshot(); - }); - - it('round trips', () => { - expect(downgradeSpec(upgradeSpec(oldSpec))).toMatchObject(oldSpec); + expect(upgradeSpec(oldSupervisorSpec)).toEqual({ + spec: { + dataSchema: { + dataSource: 'metrics-kafka', + dimensionsSpec: { + dimensionExclusions: ['timestamp', 'value'], + dimensions: [], + }, + granularitySpec: { + queryGranularity: 'NONE', + segmentGranularity: 'HOUR', + type: 'uniform', + }, + metricsSpec: [ + { + name: 'count', + type: 'count', + }, + { + fieldName: 'value', + name: 'value_sum', + type: 'doubleSum', + }, + { + fieldName: 'value', + name: 'value_min', + type: 'doubleMin', + }, + { + fieldName: 'value', + name: 'value_max', + type: 'doubleMax', + }, + ], + timestampSpec: { + column: 'timestamp', + format: 'auto', + }, + }, + ioConfig: { + consumerProperties: { + 'bootstrap.servers': 'localhost:9092', + }, + inputFormat: { + type: 'json', + }, + replicas: 1, + taskCount: 1, + taskDuration: 'PT1H', + topic: 'metrics', + }, + tuningConfig: { + maxRowsPerSegment: 5000000, + type: 'kafka', + }, + }, + type: 'kafka', + }); }); it('cleanSpec', () => { diff --git a/web-console/src/druid-models/ingestion-spec.tsx b/web-console/src/druid-models/ingestion-spec.tsx index e800fb9d7fd..1e5b14d466d 100644 --- a/web-console/src/druid-models/ingestion-spec.tsx +++ b/web-console/src/druid-models/ingestion-spec.tsx @@ -2249,6 +2249,15 @@ export function updateSchemaWithSample( // ------------------------ export function upgradeSpec(spec: any): Partial { + if (deepGet(spec, 'type') && deepGet(spec, 'dataSchema')) { + spec = { + type: spec.type, + spec: deepDelete(spec, 'type'), + }; + } + + if (!deepGet(spec, 'spec.dataSchema.parser')) return spec; + if (deepGet(spec, 'spec.ioConfig.firehose')) { switch (deepGet(spec, 'spec.ioConfig.firehose.type')) { case 'static-s3': @@ -2262,51 +2271,22 @@ export function upgradeSpec(spec: any): Partial { } spec = deepMove(spec, 'spec.ioConfig.firehose', 'spec.ioConfig.inputSource'); - spec = deepMove( - spec, - 'spec.dataSchema.parser.parseSpec.timestampSpec', - 'spec.dataSchema.timestampSpec', - ); - spec = deepMove( - spec, - 'spec.dataSchema.parser.parseSpec.dimensionsSpec', - 'spec.dataSchema.dimensionsSpec', - ); - spec = deepMove(spec, 'spec.dataSchema.parser.parseSpec', 'spec.ioConfig.inputFormat'); - spec = deepDelete(spec, 'spec.dataSchema.parser'); - spec = deepMove(spec, 'spec.ioConfig.inputFormat.format', 'spec.ioConfig.inputFormat.type'); } - return spec; -} -export function downgradeSpec(spec: Partial): Partial { - if (deepGet(spec, 'spec.ioConfig.inputSource')) { - spec = deepMove(spec, 'spec.ioConfig.inputFormat.type', 'spec.ioConfig.inputFormat.format'); - spec = deepSet(spec, 'spec.dataSchema.parser', { type: 'string' }); - spec = deepMove(spec, 'spec.ioConfig.inputFormat', 'spec.dataSchema.parser.parseSpec'); - spec = deepMove( - spec, - 'spec.dataSchema.dimensionsSpec', - 'spec.dataSchema.parser.parseSpec.dimensionsSpec', - ); - spec = deepMove( - spec, - 'spec.dataSchema.timestampSpec', - 'spec.dataSchema.parser.parseSpec.timestampSpec', - ); - spec = deepMove(spec, 'spec.ioConfig.inputSource', 'spec.ioConfig.firehose'); + spec = deepMove( + spec, + 'spec.dataSchema.parser.parseSpec.timestampSpec', + 'spec.dataSchema.timestampSpec', + ); + spec = deepMove( + spec, + 'spec.dataSchema.parser.parseSpec.dimensionsSpec', + 'spec.dataSchema.dimensionsSpec', + ); + spec = deepMove(spec, 'spec.dataSchema.parser.parseSpec', 'spec.ioConfig.inputFormat'); + spec = deepDelete(spec, 'spec.dataSchema.parser'); + spec = deepMove(spec, 'spec.ioConfig.inputFormat.format', 'spec.ioConfig.inputFormat.type'); - switch (deepGet(spec, 'spec.ioConfig.firehose.type')) { - case 's3': - deepSet(spec, 'spec.ioConfig.firehose.type', 'static-s3'); - break; - - case 'google': - deepSet(spec, 'spec.ioConfig.firehose.type', 'static-google-blobstore'); - deepMove(spec, 'spec.ioConfig.firehose.objects', 'spec.ioConfig.firehose.blobs'); - break; - } - } return spec; }