reindex flow should take order from Druid (#12790)

This commit is contained in:
Vadim Ogievetsky 2022-07-14 20:03:33 -07:00 committed by GitHub
parent 1e0542626b
commit f2a7970a6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 23 deletions

View File

@ -65,9 +65,10 @@ export interface SampleResponse {
export type CacheRows = Record<string, any>[];
export interface SampleResponseWithExtraInfo extends SampleResponse {
rollup?: boolean;
columns?: Record<string, any>;
columns?: string[];
columnInfo?: Record<string, any>;
aggregators?: Record<string, any>;
rollup?: boolean;
}
export interface SampleEntry {
@ -280,23 +281,41 @@ export async function sampleForConnect(
if (!samplerResponse.data.length) return samplerResponse;
if (reingestMode) {
const dataSource = deepGet(ioConfig, 'inputSource.dataSource');
const intervals = deepGet(ioConfig, 'inputSource.interval');
const scanResponse = await queryDruidRune({
queryType: 'scan',
dataSource,
intervals,
resultFormat: 'compactedList',
limit: 1,
columns: [],
granularity: 'all',
});
const columns = deepGet(scanResponse, '0.columns');
if (!Array.isArray(columns)) {
throw new Error(`unexpected response from scan query`);
}
samplerResponse.columns = columns;
const segmentMetadataResponse = await queryDruidRune({
queryType: 'segmentMetadata',
dataSource: deepGet(ioConfig, 'inputSource.dataSource'),
intervals: [deepGet(ioConfig, 'inputSource.interval')],
dataSource,
intervals,
merge: true,
lenientAggregatorMerge: true,
analysisTypes: ['aggregators', 'rollup'],
});
if (Array.isArray(segmentMetadataResponse) && segmentMetadataResponse.length === 1) {
const segmentMetadataResponse0 = segmentMetadataResponse[0];
samplerResponse.rollup = segmentMetadataResponse0.rollup;
samplerResponse.columns = segmentMetadataResponse0.columns;
samplerResponse.aggregators = segmentMetadataResponse0.aggregators;
} else {
if (!Array.isArray(segmentMetadataResponse) || segmentMetadataResponse.length !== 1) {
throw new Error(`unexpected response from segmentMetadata query`);
}
const segmentMetadataResponse0 = segmentMetadataResponse[0];
samplerResponse.rollup = segmentMetadataResponse0.rollup;
samplerResponse.columnInfo = segmentMetadataResponse0.columns;
samplerResponse.aggregators = segmentMetadataResponse0.aggregators;
}
return samplerResponse;

View File

@ -1290,21 +1290,22 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
newSpec = deepSet(
newSpec,
'spec.dataSchema.dimensionsSpec.dimensions',
Object.keys(inputData.columns)
.filter(k => k !== TIME_COLUMN && !aggregators[k])
.map(k => ({
name: k,
type: String(inputData.columns![k].type || 'string').toLowerCase(),
})),
filterMap(inputData.columns, column => {
if (column === TIME_COLUMN || aggregators[column]) return;
return {
name: column,
type: String(inputData.columnInfo?.[column]?.type || 'string').toLowerCase(),
};
}),
);
}
if (inputData.aggregators) {
newSpec = deepSet(
newSpec,
'spec.dataSchema.metricsSpec',
Object.values(inputData.aggregators),
);
if (inputData.aggregators) {
newSpec = deepSet(
newSpec,
'spec.dataSchema.metricsSpec',
filterMap(inputData.columns, column => aggregators[column]),
);
}
}
this.updateSpec(fillDataSourceNameIfNeeded(newSpec));