mirror of https://github.com/apache/druid.git
Web console: Data loader should allow for multiline JSON messages in kafka (#13709)
* stricter * data loader should allow for mulit-line json * add await * kinesis also
This commit is contained in:
parent
6164c420a1
commit
3b62d7929c
|
@ -64,6 +64,10 @@ exports.SQL_KEYWORDS = [
|
||||||
'OVER',
|
'OVER',
|
||||||
'PARTITION BY',
|
'PARTITION BY',
|
||||||
'WINDOW',
|
'WINDOW',
|
||||||
|
'RANGE',
|
||||||
|
'PRECEDING',
|
||||||
|
'FOLLOWING',
|
||||||
|
'EXTEND',
|
||||||
];
|
];
|
||||||
|
|
||||||
exports.SQL_EXPRESSION_PARTS = [
|
exports.SQL_EXPRESSION_PARTS = [
|
||||||
|
|
|
@ -2133,7 +2133,12 @@ export function updateIngestionType(
|
||||||
return newSpec;
|
return newSpec;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function issueWithSampleData(sampleData: string[]): JSX.Element | undefined {
|
export function issueWithSampleData(
|
||||||
|
sampleData: string[],
|
||||||
|
spec: Partial<IngestionSpec>,
|
||||||
|
): JSX.Element | undefined {
|
||||||
|
if (isStreamingSpec(spec)) return;
|
||||||
|
|
||||||
if (sampleData.length) {
|
if (sampleData.length) {
|
||||||
const firstData = sampleData[0];
|
const firstData = sampleData[0];
|
||||||
|
|
||||||
|
@ -2166,14 +2171,18 @@ export function fillInputFormatIfNeeded(
|
||||||
sampleData: string[],
|
sampleData: string[],
|
||||||
): Partial<IngestionSpec> {
|
): Partial<IngestionSpec> {
|
||||||
if (deepGet(spec, 'spec.ioConfig.inputFormat.type')) return spec;
|
if (deepGet(spec, 'spec.ioConfig.inputFormat.type')) return spec;
|
||||||
return deepSet(spec, 'spec.ioConfig.inputFormat', guessInputFormat(sampleData));
|
return deepSet(
|
||||||
|
spec,
|
||||||
|
'spec.ioConfig.inputFormat',
|
||||||
|
guessInputFormat(sampleData, isStreamingSpec(spec)),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
function noNumbers(xs: string[]): boolean {
|
function noNumbers(xs: string[]): boolean {
|
||||||
return xs.every(x => isNaN(Number(x)));
|
return xs.every(x => isNaN(Number(x)));
|
||||||
}
|
}
|
||||||
|
|
||||||
export function guessInputFormat(sampleData: string[]): InputFormat {
|
export function guessInputFormat(sampleData: string[], canBeMultiLineJson = false): InputFormat {
|
||||||
let sampleDatum = sampleData[0];
|
let sampleDatum = sampleData[0];
|
||||||
if (sampleDatum) {
|
if (sampleDatum) {
|
||||||
sampleDatum = String(sampleDatum); // Really ensure it is a string
|
sampleDatum = String(sampleDatum); // Really ensure it is a string
|
||||||
|
@ -2261,6 +2270,11 @@ export function guessInputFormat(sampleData: string[]): InputFormat {
|
||||||
numColumns: lineAsTsvPipe.length,
|
numColumns: lineAsTsvPipe.length,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the object is a single json object spanning multiple lines than the first one will just start with `{`
|
||||||
|
if (canBeMultiLineJson && sampleDatum.startsWith('{')) {
|
||||||
|
return { type: 'json', useJsonNodeReader: true };
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return inputFormatFromType({ type: 'regex' });
|
return inputFormatFromType({ type: 'regex' });
|
||||||
|
|
|
@ -114,6 +114,7 @@ function generateInputFormatFields(streaming: boolean) {
|
||||||
streaming
|
streaming
|
||||||
? {
|
? {
|
||||||
name: 'useJsonNodeReader',
|
name: 'useJsonNodeReader',
|
||||||
|
title: 'Use JSON node reader',
|
||||||
type: 'boolean',
|
type: 'boolean',
|
||||||
defined: typeIs('json'),
|
defined: typeIs('json'),
|
||||||
disabled: (inputFormat: InputFormat) => inputFormat.assumeNewlineDelimited,
|
disabled: (inputFormat: InputFormat) => inputFormat.assumeNewlineDelimited,
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
import axios, { AxiosError, AxiosInstance, AxiosRequestConfig } from 'axios';
|
import axios, { AxiosError, AxiosInstance, AxiosRequestConfig } from 'axios';
|
||||||
import * as JSONBig from 'json-bigint-native';
|
import * as JSONBig from 'json-bigint-native';
|
||||||
|
|
||||||
|
import { nonEmptyString } from '../utils';
|
||||||
|
|
||||||
export class Api {
|
export class Api {
|
||||||
static instance: AxiosInstance;
|
static instance: AxiosInstance;
|
||||||
|
|
||||||
|
@ -32,11 +34,11 @@ export class Api {
|
||||||
(error: AxiosError) => {
|
(error: AxiosError) => {
|
||||||
const responseData = error.response?.data;
|
const responseData = error.response?.data;
|
||||||
const message = responseData?.message;
|
const message = responseData?.message;
|
||||||
if (typeof message === 'string') {
|
if (nonEmptyString(message)) {
|
||||||
return Promise.reject(new Error(message));
|
return Promise.reject(new Error(message));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (error.config.method?.toLowerCase() === 'get' && typeof responseData === 'string') {
|
if (error.config.method?.toLowerCase() === 'get' && nonEmptyString(responseData)) {
|
||||||
return Promise.reject(new Error(responseData));
|
return Promise.reject(new Error(responseData));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,11 @@ export function isNumberLikeNaN(x: NumberLike): boolean {
|
||||||
return isNaN(Number(x));
|
return isNaN(Number(x));
|
||||||
}
|
}
|
||||||
|
|
||||||
export function nonEmptyArray(a: any): a is unknown[] {
|
export function nonEmptyString(s: unknown): s is string {
|
||||||
|
return typeof s === 'string' && s !== '';
|
||||||
|
}
|
||||||
|
|
||||||
|
export function nonEmptyArray(a: unknown): a is unknown[] {
|
||||||
return Array.isArray(a) && Boolean(a.length);
|
return Array.isArray(a) && Boolean(a.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -76,7 +76,7 @@ export interface TasksCardProps {
|
||||||
export const TasksCard = React.memo(function TasksCard(props: TasksCardProps) {
|
export const TasksCard = React.memo(function TasksCard(props: TasksCardProps) {
|
||||||
const [cardState] = useQueryManager<Capabilities, TaskCountsAndCapacity>({
|
const [cardState] = useQueryManager<Capabilities, TaskCountsAndCapacity>({
|
||||||
processQuery: async capabilities => {
|
processQuery: async capabilities => {
|
||||||
const taskCounts = getTaskCounts(capabilities);
|
const taskCounts = await getTaskCounts(capabilities);
|
||||||
if (!capabilities.hasOverlordAccess()) return taskCounts;
|
if (!capabilities.hasOverlordAccess()) return taskCounts;
|
||||||
|
|
||||||
const capacity = await getClusterCapacity();
|
const capacity = await getClusterCapacity();
|
||||||
|
|
|
@ -1364,7 +1364,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
|
||||||
l.input ? l.input.raw : undefined,
|
l.input ? l.input.raw : undefined,
|
||||||
);
|
);
|
||||||
|
|
||||||
const issue = issueWithSampleData(sampleLines);
|
const issue = issueWithSampleData(sampleLines, spec);
|
||||||
if (issue) {
|
if (issue) {
|
||||||
AppToaster.show({
|
AppToaster.show({
|
||||||
icon: IconNames.WARNING_SIGN,
|
icon: IconNames.WARNING_SIGN,
|
||||||
|
|
Loading…
Reference in New Issue