mirror of https://github.com/apache/druid.git
only pick kafka input format by default when needed (#16180)
This commit is contained in:
parent
a818b8acb6
commit
06268bf060
|
@ -24,6 +24,7 @@ import {
|
|||
cleanSpec,
|
||||
guessColumnTypeFromInput,
|
||||
guessColumnTypeFromSampleResponse,
|
||||
guessKafkaInputFormat,
|
||||
guessSimpleInputFormat,
|
||||
updateSchemaWithSample,
|
||||
upgradeSpec,
|
||||
|
@ -669,6 +670,36 @@ describe('ingestion-spec', () => {
|
|||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('guessKafkaInputFormat', () => {
|
||||
const sample = [
|
||||
{
|
||||
'kafka.timestamp': 1710962988515,
|
||||
'kafka.topic': 'kttm2',
|
||||
'raw':
|
||||
'{"timestamp":"2019-08-25T00:00:00.031Z","session":"S56194838","number":"16","event":{"type":"PercentClear","percentage":55},"agent":{"type":"Browser","category":"Personal computer","browser":"Chrome","browser_version":"76.0.3809.100","os":"Windows 7","platform":"Windows"},"client_ip":"181.13.41.82","geo_ip":{"continent":"South America","country":"Argentina","region":"Santa Fe","city":"Rosario"},"language":["es","es-419"],"adblock_list":"NoAdblock","app_version":"1.9.6","path":"http://www.koalastothemax.com/","loaded_image":"http://www.koalastothemax.com/img/koalas2.jpg","referrer":"Direct","referrer_host":"Direct","server_ip":"172.31.57.89","screen":"1680x1050","window":"1680x939","session_length":76261,"timezone":"N/A","timezone_offset":"180"}',
|
||||
},
|
||||
{
|
||||
'kafka.timestamp': 1710962988518,
|
||||
'kafka.topic': 'kttm2',
|
||||
'raw':
|
||||
'{"timestamp":"2019-08-25T00:00:00.059Z","session":"S46093731","number":"24","event":{"type":"PercentClear","percentage":85},"agent":{"type":"Mobile Browser","category":"Smartphone","browser":"Chrome Mobile","browser_version":"50.0.2661.89","os":"Android","platform":"Android"},"client_ip":"177.242.100.0","geo_ip":{"continent":"North America","country":"Mexico","region":"Chihuahua","city":"Nuevo Casas Grandes"},"language":["en","es","es-419","es-MX"],"adblock_list":"NoAdblock","app_version":"1.9.6","path":"https://koalastothemax.com/","loaded_image":"https://koalastothemax.com/img/koalas1.jpg","referrer":"https://www.google.com/","referrer_host":"www.google.com","server_ip":"172.31.11.5","screen":"320x570","window":"540x743","session_length":252689,"timezone":"CDT","timezone_offset":"300"}',
|
||||
},
|
||||
];
|
||||
|
||||
it('works when single topic', () => {
|
||||
expect(guessKafkaInputFormat(sample, false)).toEqual({ type: 'json' });
|
||||
});
|
||||
|
||||
it('works when multi-topic', () => {
|
||||
expect(guessKafkaInputFormat(sample, true)).toEqual({
|
||||
type: 'kafka',
|
||||
valueFormat: {
|
||||
type: 'json',
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('spec utils', () => {
|
||||
|
|
|
@ -2418,7 +2418,10 @@ export function fillInputFormatIfNeeded(
|
|||
spec,
|
||||
'spec.ioConfig.inputFormat',
|
||||
getSpecType(spec) === 'kafka'
|
||||
? guessKafkaInputFormat(filterMap(sampleResponse.data, l => l.input))
|
||||
? guessKafkaInputFormat(
|
||||
filterMap(sampleResponse.data, l => l.input),
|
||||
typeof deepGet(spec, 'spec.ioConfig.topicPattern') === 'string',
|
||||
)
|
||||
: guessSimpleInputFormat(
|
||||
filterMap(sampleResponse.data, l => l.input?.raw),
|
||||
isStreamingSpec(spec),
|
||||
|
@ -2430,15 +2433,27 @@ function noNumbers(xs: string[]): boolean {
|
|||
return xs.every(x => isNaN(Number(x)));
|
||||
}
|
||||
|
||||
export function guessKafkaInputFormat(sampleRaw: Record<string, any>[]): InputFormat {
|
||||
export function guessKafkaInputFormat(
|
||||
sampleRaw: Record<string, any>[],
|
||||
multiTopic: boolean,
|
||||
): InputFormat {
|
||||
const hasHeader = sampleRaw.some(x => Object.keys(x).some(k => k.startsWith('kafka.header.')));
|
||||
const keys = filterMap(sampleRaw, x => x['kafka.key']);
|
||||
const payloads = filterMap(sampleRaw, x => x.raw);
|
||||
const valueFormat = guessSimpleInputFormat(
|
||||
filterMap(sampleRaw, x => x.raw),
|
||||
true,
|
||||
);
|
||||
|
||||
if (!hasHeader && !keys.length && !multiTopic) {
|
||||
// No headers or keys and just a single topic means do not pick the 'kafka' format by default as it is less performant
|
||||
return valueFormat;
|
||||
}
|
||||
|
||||
return {
|
||||
type: 'kafka',
|
||||
headerFormat: hasHeader ? { type: 'string' } : undefined,
|
||||
keyFormat: keys.length ? guessSimpleInputFormat(keys, true) : undefined,
|
||||
valueFormat: guessSimpleInputFormat(payloads, true),
|
||||
valueFormat,
|
||||
};
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue