improve spec upgrading (#12072)

This commit is contained in:
Vadim Ogievetsky 2021-12-15 10:28:21 -08:00 committed by GitHub
parent 3f79453506
commit 0cc998d8a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 257 additions and 172 deletions

View File

@ -1,77 +0,0 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[`ingestion-spec upgrades 1`] = `
Object {
"spec": Object {
"dataSchema": Object {
"dataSource": "wikipedia",
"dimensionsSpec": Object {
"dimensions": Array [
"channel",
"cityName",
"comment",
],
},
"granularitySpec": Object {
"queryGranularity": "hour",
"rollup": true,
"segmentGranularity": "day",
},
"metricsSpec": Array [
Object {
"name": "count",
"type": "count",
},
Object {
"fieldName": "added",
"name": "sum_added",
"type": "longSum",
},
],
"timestampSpec": Object {
"column": "timestamp",
"format": "iso",
},
"transformSpec": Object {
"filter": Object {
"dimension": "commentLength",
"type": "selector",
"value": "35",
},
"transforms": Array [
Object {
"expression": "concat(\\"channel\\", 'lol')",
"name": "channel",
"type": "expression",
},
],
},
},
"ioConfig": Object {
"inputFormat": Object {
"flattenSpec": Object {
"fields": Array [
Object {
"expr": "$.cityName",
"name": "cityNameAlt",
"type": "path",
},
],
},
"type": "json",
},
"inputSource": Object {
"type": "http",
"uris": Array [
"https://static.imply.io/data/wikipedia.json.gz",
],
},
"type": "index_parallel",
},
"tuningConfig": Object {
"type": "index_parallel",
},
},
"type": "index_parallel",
}
`;

View File

@ -19,7 +19,6 @@
import {
adjustId,
cleanSpec,
downgradeSpec,
getColumnTypeFromHeaderAndRows,
guessInputFormat,
guessTypeFromSample,
@ -29,60 +28,164 @@ import {
} from './ingestion-spec';
describe('ingestion-spec', () => {
const oldSpec = {
type: 'index_parallel',
spec: {
ioConfig: {
type: 'index_parallel',
firehose: {
type: 'http',
uris: ['https://static.imply.io/data/wikipedia.json.gz'],
it('upgrades / downgrades task spec', () => {
const oldTaskSpec = {
type: 'index_parallel',
spec: {
ioConfig: {
type: 'index_parallel',
firehose: {
type: 'http',
uris: ['https://static.imply.io/data/wikipedia.json.gz'],
},
},
tuningConfig: {
type: 'index_parallel',
},
dataSchema: {
dataSource: 'wikipedia',
granularitySpec: {
segmentGranularity: 'day',
queryGranularity: 'hour',
rollup: true,
},
parser: {
type: 'string',
parseSpec: {
format: 'json',
timestampSpec: {
column: 'timestamp',
format: 'iso',
},
dimensionsSpec: {
dimensions: ['channel', 'cityName', 'comment'],
},
flattenSpec: {
fields: [
{
type: 'path',
name: 'cityNameAlt',
expr: '$.cityName',
},
],
},
},
},
transformSpec: {
transforms: [
{
type: 'expression',
name: 'channel',
expression: 'concat("channel", \'lol\')',
},
],
filter: {
type: 'selector',
dimension: 'commentLength',
value: '35',
},
},
metricsSpec: [
{
name: 'count',
type: 'count',
},
{
name: 'sum_added',
type: 'longSum',
fieldName: 'added',
},
],
},
},
tuningConfig: {
type: 'index_parallel',
};
expect(upgradeSpec(oldTaskSpec)).toEqual({
spec: {
dataSchema: {
dataSource: 'wikipedia',
dimensionsSpec: {
dimensions: ['channel', 'cityName', 'comment'],
},
granularitySpec: {
queryGranularity: 'hour',
rollup: true,
segmentGranularity: 'day',
},
metricsSpec: [
{
name: 'count',
type: 'count',
},
{
fieldName: 'added',
name: 'sum_added',
type: 'longSum',
},
],
timestampSpec: {
column: 'timestamp',
format: 'iso',
},
transformSpec: {
filter: {
dimension: 'commentLength',
type: 'selector',
value: '35',
},
transforms: [
{
expression: 'concat("channel", \'lol\')',
name: 'channel',
type: 'expression',
},
],
},
},
ioConfig: {
inputFormat: {
flattenSpec: {
fields: [
{
expr: '$.cityName',
name: 'cityNameAlt',
type: 'path',
},
],
},
type: 'json',
},
inputSource: {
type: 'http',
uris: ['https://static.imply.io/data/wikipedia.json.gz'],
},
type: 'index_parallel',
},
tuningConfig: {
type: 'index_parallel',
},
},
type: 'index_parallel',
});
});
it('upgrades / downgrades supervisor spec', () => {
const oldSupervisorSpec = {
type: 'kafka',
dataSchema: {
dataSource: 'wikipedia',
granularitySpec: {
segmentGranularity: 'day',
queryGranularity: 'hour',
rollup: true,
},
dataSource: 'metrics-kafka',
parser: {
type: 'string',
parseSpec: {
format: 'json',
timestampSpec: {
column: 'timestamp',
format: 'iso',
format: 'auto',
},
dimensionsSpec: {
dimensions: ['channel', 'cityName', 'comment'],
dimensions: [],
dimensionExclusions: ['timestamp', 'value'],
},
flattenSpec: {
fields: [
{
type: 'path',
name: 'cityNameAlt',
expr: '$.cityName',
},
],
},
},
},
transformSpec: {
transforms: [
{
type: 'expression',
name: 'channel',
expression: 'concat("channel", \'lol\')',
},
],
filter: {
type: 'selector',
dimension: 'commentLength',
value: '35',
},
},
metricsSpec: [
@ -91,21 +194,100 @@ describe('ingestion-spec', () => {
type: 'count',
},
{
name: 'sum_added',
type: 'longSum',
fieldName: 'added',
name: 'value_sum',
fieldName: 'value',
type: 'doubleSum',
},
{
name: 'value_min',
fieldName: 'value',
type: 'doubleMin',
},
{
name: 'value_max',
fieldName: 'value',
type: 'doubleMax',
},
],
granularitySpec: {
type: 'uniform',
segmentGranularity: 'HOUR',
queryGranularity: 'NONE',
},
},
},
};
tuningConfig: {
type: 'kafka',
maxRowsPerSegment: 5000000,
},
ioConfig: {
topic: 'metrics',
consumerProperties: {
'bootstrap.servers': 'localhost:9092',
},
taskCount: 1,
replicas: 1,
taskDuration: 'PT1H',
},
};
it('upgrades', () => {
expect(upgradeSpec(oldSpec)).toMatchSnapshot();
});
it('round trips', () => {
expect(downgradeSpec(upgradeSpec(oldSpec))).toMatchObject(oldSpec);
expect(upgradeSpec(oldSupervisorSpec)).toEqual({
spec: {
dataSchema: {
dataSource: 'metrics-kafka',
dimensionsSpec: {
dimensionExclusions: ['timestamp', 'value'],
dimensions: [],
},
granularitySpec: {
queryGranularity: 'NONE',
segmentGranularity: 'HOUR',
type: 'uniform',
},
metricsSpec: [
{
name: 'count',
type: 'count',
},
{
fieldName: 'value',
name: 'value_sum',
type: 'doubleSum',
},
{
fieldName: 'value',
name: 'value_min',
type: 'doubleMin',
},
{
fieldName: 'value',
name: 'value_max',
type: 'doubleMax',
},
],
timestampSpec: {
column: 'timestamp',
format: 'auto',
},
},
ioConfig: {
consumerProperties: {
'bootstrap.servers': 'localhost:9092',
},
inputFormat: {
type: 'json',
},
replicas: 1,
taskCount: 1,
taskDuration: 'PT1H',
topic: 'metrics',
},
tuningConfig: {
maxRowsPerSegment: 5000000,
type: 'kafka',
},
},
type: 'kafka',
});
});
it('cleanSpec', () => {

View File

@ -2249,6 +2249,15 @@ export function updateSchemaWithSample(
// ------------------------
export function upgradeSpec(spec: any): Partial<IngestionSpec> {
if (deepGet(spec, 'type') && deepGet(spec, 'dataSchema')) {
spec = {
type: spec.type,
spec: deepDelete(spec, 'type'),
};
}
if (!deepGet(spec, 'spec.dataSchema.parser')) return spec;
if (deepGet(spec, 'spec.ioConfig.firehose')) {
switch (deepGet(spec, 'spec.ioConfig.firehose.type')) {
case 'static-s3':
@ -2262,51 +2271,22 @@ export function upgradeSpec(spec: any): Partial<IngestionSpec> {
}
spec = deepMove(spec, 'spec.ioConfig.firehose', 'spec.ioConfig.inputSource');
spec = deepMove(
spec,
'spec.dataSchema.parser.parseSpec.timestampSpec',
'spec.dataSchema.timestampSpec',
);
spec = deepMove(
spec,
'spec.dataSchema.parser.parseSpec.dimensionsSpec',
'spec.dataSchema.dimensionsSpec',
);
spec = deepMove(spec, 'spec.dataSchema.parser.parseSpec', 'spec.ioConfig.inputFormat');
spec = deepDelete(spec, 'spec.dataSchema.parser');
spec = deepMove(spec, 'spec.ioConfig.inputFormat.format', 'spec.ioConfig.inputFormat.type');
}
return spec;
}
export function downgradeSpec(spec: Partial<IngestionSpec>): Partial<IngestionSpec> {
if (deepGet(spec, 'spec.ioConfig.inputSource')) {
spec = deepMove(spec, 'spec.ioConfig.inputFormat.type', 'spec.ioConfig.inputFormat.format');
spec = deepSet(spec, 'spec.dataSchema.parser', { type: 'string' });
spec = deepMove(spec, 'spec.ioConfig.inputFormat', 'spec.dataSchema.parser.parseSpec');
spec = deepMove(
spec,
'spec.dataSchema.dimensionsSpec',
'spec.dataSchema.parser.parseSpec.dimensionsSpec',
);
spec = deepMove(
spec,
'spec.dataSchema.timestampSpec',
'spec.dataSchema.parser.parseSpec.timestampSpec',
);
spec = deepMove(spec, 'spec.ioConfig.inputSource', 'spec.ioConfig.firehose');
spec = deepMove(
spec,
'spec.dataSchema.parser.parseSpec.timestampSpec',
'spec.dataSchema.timestampSpec',
);
spec = deepMove(
spec,
'spec.dataSchema.parser.parseSpec.dimensionsSpec',
'spec.dataSchema.dimensionsSpec',
);
spec = deepMove(spec, 'spec.dataSchema.parser.parseSpec', 'spec.ioConfig.inputFormat');
spec = deepDelete(spec, 'spec.dataSchema.parser');
spec = deepMove(spec, 'spec.ioConfig.inputFormat.format', 'spec.ioConfig.inputFormat.type');
switch (deepGet(spec, 'spec.ioConfig.firehose.type')) {
case 's3':
deepSet(spec, 'spec.ioConfig.firehose.type', 'static-s3');
break;
case 'google':
deepSet(spec, 'spec.ioConfig.firehose.type', 'static-google-blobstore');
deepMove(spec, 'spec.ioConfig.firehose.objects', 'spec.ioConfig.firehose.blobs');
break;
}
}
return spec;
}