Web console: add expectedLoadTimeMillis (#17359)

* add expectedLoadTimeMillis

* make spec cleaning less agro

* more cleanup
This commit is contained in:
Vadim Ogievetsky 2024-10-16 13:14:27 -07:00 committed by GitHub
parent 8ddb316e68
commit 877784e5fd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 373 additions and 254 deletions

View File

@ -269,18 +269,23 @@ ORDER BY "start" DESC`;
};
this.dataQueryManager = new QueryManager({
processQuery: async ({ capabilities, dateRange }) => {
processQuery: async ({ capabilities, dateRange }, cancelToken) => {
let intervals: IntervalRow[];
let datasources: string[];
if (capabilities.hasSql()) {
intervals = await queryDruidSql({
intervals = await queryDruidSql(
{
query: SegmentTimeline.getSqlQuery(dateRange),
});
},
cancelToken,
);
datasources = uniq(intervals.map(r => r.datasource).sort());
} else if (capabilities.hasCoordinatorAccess()) {
const startIso = dateRange[0].toISOString();
datasources = (await Api.instance.get(`/druid/coordinator/v1/datasources`)).data;
datasources = (
await Api.instance.get(`/druid/coordinator/v1/datasources`, { cancelToken })
).data;
intervals = (
await Promise.all(
datasources.map(async datasource => {
@ -289,6 +294,7 @@ ORDER BY "start" DESC`;
`/druid/coordinator/v1/datasources/${Api.encodePath(
datasource,
)}/intervals?simple`,
{ cancelToken },
)
).data;

View File

@ -39,8 +39,8 @@ export const ShowJson = React.memo(function ShowJson(props: ShowJsonProps) {
const { endpoint, transform, downloadFilename } = props;
const [jsonState] = useQueryManager<null, string>({
processQuery: async () => {
const resp = await Api.instance.get(endpoint);
processQuery: async (_, cancelToken) => {
const resp = await Api.instance.get(endpoint, { cancelToken });
let data = resp.data;
if (transform) data = transform(data);
return typeof data === 'string' ? data : JSONBig.stringify(data, undefined, 2);

View File

@ -62,10 +62,11 @@ export class ShowLog extends React.PureComponent<ShowLogProps, ShowLogState> {
};
this.showLogQueryManager = new QueryManager({
processQuery: async () => {
processQuery: async (_, cancelToken) => {
const { endpoint, tailOffset } = this.props;
const resp = await Api.instance.get(
endpoint + (tailOffset ? `?offset=-${tailOffset}` : ''),
{ cancelToken },
);
const data = resp.data;

View File

@ -48,9 +48,10 @@ export const SupervisorHistoryPanel = React.memo(function SupervisorHistoryPanel
const [diffIndex, setDiffIndex] = useState(-1);
const [historyState] = useQueryManager<string, SupervisorHistoryEntry[]>({
initQuery: supervisorId,
processQuery: async supervisorId => {
processQuery: async (supervisorId, cancelToken) => {
const resp = await Api.instance.get(
`/druid/indexer/v1/supervisor/${Api.encodePath(supervisorId)}/history`,
{ cancelToken },
);
return resp.data.map((vs: SupervisorHistoryEntry) => deepSet(vs, 'spec', cleanSpec(vs.spec)));
},

View File

@ -64,9 +64,11 @@ export const CompactionDynamicConfigDialog = React.memo(function CompactionDynam
useQueryManager<null, Record<string, any>>({
initQuery: null,
processQuery: async () => {
processQuery: async (_, cancelToken) => {
try {
const c = (await Api.instance.get('/druid/coordinator/v1/config/compaction')).data;
const c = (
await Api.instance.get('/druid/coordinator/v1/config/compaction', { cancelToken })
).data;
setDynamicConfig({
compactionTaskSlotRatio: c.compactionTaskSlotRatio ?? DEFAULT_RATIO,
maxCompactionTaskSlots: c.maxCompactionTaskSlots ?? DEFAULT_MAX,

View File

@ -63,10 +63,11 @@ export const CompactionHistoryDialog = React.memo(function CompactionHistoryDial
const [diffIndex, setDiffIndex] = useState(-1);
const [historyState] = useQueryManager<string, CompactionHistoryEntry[]>({
initQuery: datasource,
processQuery: async datasource => {
processQuery: async (datasource, cancelToken) => {
try {
const resp = await Api.instance.get(
`/druid/coordinator/v1/config/compaction/${Api.encodePath(datasource)}/history?count=20`,
{ cancelToken },
);
return resp.data;
} catch (e) {

View File

@ -46,17 +46,19 @@ export const CoordinatorDynamicConfigDialog = React.memo(function CoordinatorDyn
const [historyRecordsState] = useQueryManager<null, any[]>({
initQuery: null,
processQuery: async () => {
const historyResp = await Api.instance.get(`/druid/coordinator/v1/config/history?count=100`);
processQuery: async (_, cancelToken) => {
const historyResp = await Api.instance.get(`/druid/coordinator/v1/config/history?count=100`, {
cancelToken,
});
return historyResp.data;
},
});
useQueryManager<null, Record<string, any>>({
initQuery: null,
processQuery: async () => {
processQuery: async (_, cancelToken) => {
try {
const configResp = await Api.instance.get('/druid/coordinator/v1/config');
const configResp = await Api.instance.get('/druid/coordinator/v1/config', { cancelToken });
setDynamicConfig(configResp.data || {});
} catch (e) {
AppToaster.show({

View File

@ -42,11 +42,14 @@ export const DatasourceColumnsTable = React.memo(function DatasourceColumnsTable
) {
const [columnsState] = useQueryManager<string, DatasourceColumnsTableRow[]>({
initQuery: props.datasource,
processQuery: async (datasourceId: string) => {
return await queryDruidSql<ColumnMetadata>({
processQuery: async (datasourceId, cancelToken) => {
return await queryDruidSql<ColumnMetadata>(
{
query: `SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = 'druid' AND TABLE_NAME = ${L(datasourceId)}`,
});
},
cancelToken,
);
},
});

View File

@ -41,10 +41,13 @@ export const LookupValuesTable = React.memo(function LookupValuesTable(
props: LookupValuesTableProps,
) {
const [entriesState] = useQueryManager<string, LookupRow[]>({
processQuery: async (lookupId: string) => {
return await queryDruidSql<LookupRow>({
processQuery: async (lookupId, cancelToken) => {
return await queryDruidSql<LookupRow>(
{
query: `SELECT "k", "v" FROM ${N('lookup').table(lookupId)} LIMIT 5000`,
});
},
cancelToken,
);
},
initQuery: props.lookupId,
});

View File

@ -46,17 +46,19 @@ export const OverlordDynamicConfigDialog = React.memo(function OverlordDynamicCo
const [historyRecordsState] = useQueryManager<null, any[]>({
initQuery: null,
processQuery: async () => {
const historyResp = await Api.instance.get(`/druid/indexer/v1/worker/history?count=100`);
processQuery: async (_, cancelToken) => {
const historyResp = await Api.instance.get(`/druid/indexer/v1/worker/history?count=100`, {
cancelToken,
});
return historyResp.data;
},
});
useQueryManager<null, Record<string, any>>({
initQuery: null,
processQuery: async () => {
processQuery: async (_, cancelToken) => {
try {
const configResp = await Api.instance.get(`/druid/indexer/v1/worker`);
const configResp = await Api.instance.get(`/druid/indexer/v1/worker`, { cancelToken });
setDynamicConfig(configResp.data || {});
} catch (e) {
AppToaster.show({

View File

@ -52,19 +52,24 @@ export const RetentionDialog = React.memo(function RetentionDialog(props: Retent
const [tiersState] = useQueryManager<Capabilities, string[]>({
initQuery: capabilities,
processQuery: async capabilities => {
processQuery: async (capabilities, cancelToken) => {
if (capabilities.hasSql()) {
const sqlResp = await queryDruidSql<{ tier: string }>({
const sqlResp = await queryDruidSql<{ tier: string }>(
{
query: `SELECT "tier"
FROM "sys"."servers"
WHERE "server_type" = 'historical'
GROUP BY 1
ORDER BY 1`,
});
},
cancelToken,
);
return sqlResp.map(d => d.tier);
} else if (capabilities.hasCoordinatorAccess()) {
const allServiceResp = await Api.instance.get('/druid/coordinator/v1/servers?simple');
const allServiceResp = await Api.instance.get('/druid/coordinator/v1/servers?simple', {
cancelToken,
});
return filterMap(allServiceResp.data, (s: any) =>
s.type === 'historical' ? s.tier : undefined,
);
@ -78,9 +83,10 @@ ORDER BY 1`,
const [historyQueryState] = useQueryManager<string, any[]>({
initQuery: props.datasource,
processQuery: async datasource => {
processQuery: async (datasource, cancelToken) => {
const historyResp = await Api.instance.get(
`/druid/coordinator/v1/rules/${Api.encodePath(datasource)}/history?count=200`,
{ cancelToken },
);
return historyResp.data;
},

View File

@ -49,8 +49,8 @@ export const StatusDialog = React.memo(function StatusDialog(props: StatusDialog
const [responseState] = useQueryManager<null, StatusResponse>({
initQuery: null,
processQuery: async () => {
const resp = await Api.instance.get(`/status`);
processQuery: async (_, cancelToken) => {
const resp = await Api.instance.get(`/status`, { cancelToken });
return resp.data;
},
});

View File

@ -104,9 +104,10 @@ export const SupervisorResetOffsetsDialog = React.memo(function SupervisorResetO
const [statusResp] = useQueryManager<string, SupervisorStatus>({
initQuery: supervisorId,
processQuery: async supervisorId => {
processQuery: async (supervisorId, cancelToken) => {
const statusResp = await Api.instance.get(
`/druid/indexer/v1/supervisor/${Api.encodePath(supervisorId)}/status`,
{ cancelToken },
);
return statusResp.data;
},

View File

@ -60,8 +60,8 @@ export const SupervisorStatisticsTable = React.memo(function SupervisorStatistic
SupervisorStatisticsTableRow[]
>({
initQuery: null,
processQuery: async () => {
const resp = await Api.instance.get<SupervisorStats>(statsEndpoint);
processQuery: async (_, cancelToken) => {
const resp = await Api.instance.get<SupervisorStats>(statsEndpoint, { cancelToken });
return normalizeSupervisorStatisticsResults(resp.data);
},
});

View File

@ -27,12 +27,12 @@ import { AutoForm, ExternalLink } from '../../components';
import { IndexSpecDialog } from '../../dialogs/index-spec-dialog/index-spec-dialog';
import { getLink } from '../../links';
import {
allowKeys,
deepDelete,
deepGet,
deepMove,
deepSet,
deepSetIfUnset,
deleteKeys,
EMPTY_ARRAY,
EMPTY_OBJECT,
filterMap,
@ -79,6 +79,11 @@ export interface IngestionSpec {
readonly spec: IngestionSpecInner;
readonly context?: { useConcurrentLocks?: boolean };
readonly suspended?: boolean;
// Added by the server
readonly id?: string;
readonly groupId?: string;
readonly resource?: any;
}
export interface IngestionSpecInner {
@ -490,11 +495,37 @@ export function normalizeSpec(spec: Partial<IngestionSpec>): IngestionSpec {
}
/**
* Make sure that any extra junk in the spec other than 'type', 'spec', and 'context' is removed
* This function cleans a spec that was returned by the server so that it can be re-opened in the data loader to be
* submitted again.
* @param spec - the spec to clean
*/
export function cleanSpec(spec: Partial<IngestionSpec>): Partial<IngestionSpec> {
return allowKeys(spec, ['type', 'spec', 'context', 'suspended']) as IngestionSpec;
const specSpec = spec.spec;
// For backwards compatible reasons the contents of `spec` (`dataSchema`, `ioConfig`, and `tuningConfig`)
// can be duplicated at the top level. This function removes these duplicates (if needed) so that there is no confusion
// which is the authoritative copy.
if (
specSpec &&
specSpec.dataSchema &&
specSpec.ioConfig &&
specSpec.tuningConfig &&
(spec as any).dataSchema &&
(spec as any).ioConfig &&
(spec as any).tuningConfig
) {
spec = deleteKeys(spec, ['dataSchema', 'ioConfig', 'tuningConfig'] as any[]);
}
// Sometimes the dataSource can (redundantly) make it to the top level for some reason - delete it
if (
typeof specSpec?.dataSchema?.dataSource === 'string' &&
typeof (spec as any).dataSource === 'string'
) {
spec = deleteKeys(spec, ['dataSource'] as any[]);
}
return deleteKeys(spec, ['id', 'groupId', 'resource']);
}
export function upgradeSpec(spec: any, yolo = false): Partial<IngestionSpec> {

View File

@ -319,10 +319,13 @@ export class DruidError extends Error {
}
}
export async function queryDruidRune(runeQuery: Record<string, any>): Promise<any> {
export async function queryDruidRune(
runeQuery: Record<string, any>,
cancelToken?: CancelToken,
): Promise<any> {
let runeResultResp: AxiosResponse;
try {
runeResultResp = await Api.instance.post('/druid/v2', runeQuery);
runeResultResp = await Api.instance.post('/druid/v2', runeQuery, { cancelToken });
} catch (e) {
throw new Error(getDruidErrorMessage(e));
}

View File

@ -426,19 +426,22 @@ GROUP BY 1, 2`;
this.datasourceQueryManager = new QueryManager<DatasourceQuery, DatasourcesAndDefaultRules>({
processQuery: async (
{ capabilities, visibleColumns, showUnused },
_cancelToken,
cancelToken,
setIntermediateQuery,
) => {
let datasources: DatasourceQueryResultRow[];
if (capabilities.hasSql()) {
const query = DatasourcesView.query(visibleColumns);
setIntermediateQuery(query);
datasources = await queryDruidSql({ query });
datasources = await queryDruidSql({ query }, cancelToken);
} else if (capabilities.hasCoordinatorAccess()) {
const datasourcesResp = await Api.instance.get(
'/druid/coordinator/v1/datasources?simple',
{ cancelToken },
);
const loadstatusResp = await Api.instance.get('/druid/coordinator/v1/loadstatus?simple');
const loadstatusResp = await Api.instance.get('/druid/coordinator/v1/loadstatus?simple', {
cancelToken,
});
const loadstatus = loadstatusResp.data;
datasources = datasourcesResp.data.map((d: any): DatasourceQueryResultRow => {
const totalDataSize = deepGet(d, 'properties.segments.size') || -1;

View File

@ -31,14 +31,20 @@ export interface DatasourcesCardProps {
export const DatasourcesCard = React.memo(function DatasourcesCard(props: DatasourcesCardProps) {
const [datasourceCountState] = useQueryManager<Capabilities, number>({
processQuery: async capabilities => {
initQuery: props.capabilities,
processQuery: async (capabilities, cancelToken) => {
let datasources: any[];
if (capabilities.hasSql()) {
datasources = await queryDruidSql({
datasources = await queryDruidSql(
{
query: `SELECT datasource FROM sys.segments GROUP BY 1`,
});
},
cancelToken,
);
} else if (capabilities.hasCoordinatorAccess()) {
const datasourcesResp = await Api.instance.get('/druid/coordinator/v1/datasources');
const datasourcesResp = await Api.instance.get('/druid/coordinator/v1/datasources', {
cancelToken,
});
datasources = datasourcesResp.data;
} else {
throw new Error(`must have SQL or coordinator access`);
@ -46,7 +52,6 @@ export const DatasourcesCard = React.memo(function DatasourcesCard(props: Dataso
return datasources.length;
},
initQuery: props.capabilities,
});
return (

View File

@ -32,16 +32,18 @@ export interface LookupsCardProps {
export const LookupsCard = React.memo(function LookupsCard(props: LookupsCardProps) {
const [lookupsCountState] = useQueryManager<Capabilities, number>({
processQuery: async capabilities => {
initQuery: props.capabilities,
processQuery: async (capabilities, cancelToken) => {
if (capabilities.hasCoordinatorAccess()) {
const resp = await Api.instance.get('/druid/coordinator/v1/lookups/status');
const resp = await Api.instance.get('/druid/coordinator/v1/lookups/status', {
cancelToken,
});
const data = resp.data;
return sum(Object.keys(data).map(k => Object.keys(data[k]).length));
} else {
throw new Error(`must have coordinator access`);
}
},
initQuery: props.capabilities,
});
return (

View File

@ -40,9 +40,10 @@ export interface SegmentsCardProps {
export const SegmentsCard = React.memo(function SegmentsCard(props: SegmentsCardProps) {
const [segmentCountState] = useQueryManager<Capabilities, SegmentCounts>({
initQuery: props.capabilities,
processQuery: async capabilities => {
processQuery: async (capabilities, cancelToken) => {
if (capabilities.hasSql()) {
const segments = await queryDruidSql({
const segments = await queryDruidSql(
{
query: `SELECT
COUNT(*) AS "active",
COUNT(*) FILTER (WHERE is_available = 1) AS "cached_on_historical",
@ -50,15 +51,20 @@ export const SegmentsCard = React.memo(function SegmentsCard(props: SegmentsCard
COUNT(*) FILTER (WHERE is_realtime = 1) AS "realtime"
FROM sys.segments
WHERE is_active = 1`,
});
},
cancelToken,
);
return segments.length === 1 ? segments[0] : null;
} else if (capabilities.hasCoordinatorAccess()) {
const loadstatusResp = await Api.instance.get('/druid/coordinator/v1/loadstatus?simple');
const loadstatusResp = await Api.instance.get('/druid/coordinator/v1/loadstatus?simple', {
cancelToken,
});
const loadstatus = loadstatusResp.data;
const unavailableSegmentNum = sum(Object.keys(loadstatus), key => loadstatus[key]);
const datasourcesMetaResp = await Api.instance.get(
'/druid/coordinator/v1/datasources?simple',
{ cancelToken },
);
const datasourcesMeta = datasourcesMetaResp.data;
const availableSegmentNum = sum(datasourcesMeta, (curr: any) =>

View File

@ -43,24 +43,29 @@ export interface ServicesCardProps {
export const ServicesCard = React.memo(function ServicesCard(props: ServicesCardProps) {
const [serviceCountState] = useQueryManager<Capabilities, ServiceCounts>({
processQuery: async capabilities => {
processQuery: async (capabilities, cancelToken) => {
if (capabilities.hasSql()) {
const serviceCountsFromQuery: {
service_type: string;
count: number;
}[] = await queryDruidSql({
}[] = await queryDruidSql(
{
query: `SELECT server_type AS "service_type", COUNT(*) as "count" FROM sys.servers GROUP BY 1`,
});
},
cancelToken,
);
return lookupBy(
serviceCountsFromQuery,
x => x.service_type,
x => x.count,
);
} else if (capabilities.hasCoordinatorAccess()) {
const services = (await Api.instance.get('/druid/coordinator/v1/servers?simple')).data;
const services = (
await Api.instance.get('/druid/coordinator/v1/servers?simple', { cancelToken })
).data;
const middleManager = capabilities.hasOverlordAccess()
? (await Api.instance.get('/druid/indexer/v1/workers')).data
? (await Api.instance.get('/druid/indexer/v1/workers', { cancelToken })).data
: [];
return {

View File

@ -49,8 +49,8 @@ export const StatusCard = React.memo(function StatusCard(props: StatusCardProps)
const [showStatusDialog, setShowStatusDialog] = useState(false);
const [statusSummaryState] = useQueryManager<null, StatusSummary>({
initQuery: null,
processQuery: async () => {
const statusResp = await Api.instance.get('/status');
processQuery: async (_, cancelToken) => {
const statusResp = await Api.instance.get('/status', { cancelToken });
return {
version: statusResp.data.version,
extensionCount: statusResp.data.modules.length,
@ -60,9 +60,9 @@ export const StatusCard = React.memo(function StatusCard(props: StatusCardProps)
const [nullModeDetectionState] = useQueryManager<Capabilities, NullModeDetection>({
initQuery: capabilities,
processQuery: async capabilities => {
processQuery: async (capabilities, cancelToken) => {
if (!capabilities.hasQuerying()) return {};
const nullDetectionResponse = await queryDruidRune(NULL_DETECTION_QUERY);
const nullDetectionResponse = await queryDruidRune(NULL_DETECTION_QUERY, cancelToken);
return nullDetectionQueryResultDecoder(deepGet(nullDetectionResponse, '0.result'));
},
});

View File

@ -36,18 +36,21 @@ export interface SupervisorsCardProps {
export const SupervisorsCard = React.memo(function SupervisorsCard(props: SupervisorsCardProps) {
const [supervisorCountState] = useQueryManager<Capabilities, SupervisorCounts>({
processQuery: async capabilities => {
processQuery: async (capabilities, cancelToken) => {
if (capabilities.hasSql()) {
return (
await queryDruidSql({
await queryDruidSql(
{
query: `SELECT
COUNT(*) FILTER (WHERE "suspended" = 0) AS "running",
COUNT(*) FILTER (WHERE "suspended" = 1) AS "suspended"
FROM sys.supervisors`,
})
},
cancelToken,
)
)[0];
} else if (capabilities.hasOverlordAccess()) {
const resp = await Api.instance.get('/druid/indexer/v1/supervisor?full');
const resp = await Api.instance.get('/druid/indexer/v1/supervisor?full', { cancelToken });
const data = resp.data;
return {
running: data.filter((d: any) => d.spec.suspended === false).length,

View File

@ -17,6 +17,7 @@
*/
import { IconNames } from '@blueprintjs/icons';
import type { CancelToken } from 'axios';
import React from 'react';
import { PluralPairIfNeeded } from '../../../components';
@ -40,22 +41,28 @@ export interface TaskCounts {
waiting?: number;
}
async function getTaskCounts(capabilities: Capabilities): Promise<TaskCounts> {
async function getTaskCounts(
capabilities: Capabilities,
cancelToken: CancelToken,
): Promise<TaskCounts> {
if (capabilities.hasSql()) {
const taskCountsFromQuery = await queryDruidSql<{ status: string; count: number }>({
const taskCountsFromQuery = await queryDruidSql<{ status: string; count: number }>(
{
query: `SELECT
CASE WHEN "status" = 'RUNNING' THEN "runner_status" ELSE "status" END AS "status",
COUNT(*) AS "count"
FROM sys.tasks
GROUP BY 1`,
});
},
cancelToken,
);
return lookupBy(
taskCountsFromQuery,
x => x.status.toLowerCase(),
x => x.count,
);
} else if (capabilities.hasOverlordAccess()) {
const tasks: any[] = (await Api.instance.get('/druid/indexer/v1/tasks')).data;
const tasks: any[] = (await Api.instance.get('/druid/indexer/v1/tasks', { cancelToken })).data;
return {
success: tasks.filter(d => getTaskStatus(d) === 'SUCCESS').length,
failed: tasks.filter(d => getTaskStatus(d) === 'FAILED').length,
@ -76,14 +83,14 @@ export interface TasksCardProps {
export const TasksCard = React.memo(function TasksCard(props: TasksCardProps) {
const [cardState] = useQueryManager<Capabilities, TaskCountsAndCapacity>({
processQuery: async capabilities => {
const taskCounts = await getTaskCounts(capabilities);
initQuery: props.capabilities,
processQuery: async (capabilities, cancelToken) => {
const taskCounts = await getTaskCounts(capabilities, cancelToken);
if (!capabilities.hasOverlordAccess()) return taskCounts;
const capacity = await getClusterCapacity();
return { ...taskCounts, ...capacity };
},
initQuery: props.capabilities,
});
const { success, failed, running, pending, waiting, totalTaskSlots } = cardState.data || {};

View File

@ -123,16 +123,19 @@ export class LookupsView extends React.PureComponent<LookupsViewProps, LookupsVi
};
this.lookupsQueryManager = new QueryManager({
processQuery: async () => {
processQuery: async (_, cancelToken) => {
const tiersResp = await Api.instance.get(
'/druid/coordinator/v1/lookups/config?discover=true',
{ cancelToken },
);
const tiers =
tiersResp.data && tiersResp.data.length > 0
? tiersResp.data.sort(tierNameCompare)
: [DEFAULT_LOOKUP_TIER];
const lookupResp = await Api.instance.get('/druid/coordinator/v1/lookups/config/all');
const lookupResp = await Api.instance.get('/druid/coordinator/v1/lookups/config/all', {
cancelToken,
});
const lookupData = lookupResp.data;
const lookupEntries: LookupEntry[] = [];

View File

@ -255,7 +255,7 @@ END AS "time_span"`,
this.segmentsQueryManager = new QueryManager({
debounceIdle: 500,
processQuery: async (query: SegmentsQuery, _cancelToken, setIntermediateQuery) => {
processQuery: async (query: SegmentsQuery, cancelToken, setIntermediateQuery) => {
const { page, pageSize, filtered, sorted, visibleColumns, capabilities, groupByInterval } =
query;
@ -356,10 +356,10 @@ END AS "time_span"`,
}
const sqlQuery = queryParts.join('\n');
setIntermediateQuery(sqlQuery);
return await queryDruidSql({ query: sqlQuery });
return await queryDruidSql({ query: sqlQuery }, cancelToken);
} else if (capabilities.hasCoordinatorAccess()) {
let datasourceList: string[] = (
await Api.instance.get('/druid/coordinator/v1/metadata/datasources')
await Api.instance.get('/druid/coordinator/v1/metadata/datasources', { cancelToken })
).data;
const datasourceFilter = filtered.find(({ id }) => id === 'datasource');
@ -383,6 +383,7 @@ END AS "time_span"`,
const segments = (
await Api.instance.get(
`/druid/coordinator/v1/datasources/${Api.encodePath(datasourceList[i])}?full`,
{ cancelToken },
)
).data?.segments;
if (!Array.isArray(segments)) continue;

View File

@ -18,7 +18,7 @@
import { Button, ButtonGroup, Intent, Label, MenuItem, Tag } from '@blueprintjs/core';
import { IconNames } from '@blueprintjs/icons';
import { sum } from 'd3-array';
import { max, sum } from 'd3-array';
import React from 'react';
import type { Filter } from 'react-table';
import ReactTable from 'react-table';
@ -40,21 +40,25 @@ import type { QueryWithContext } from '../../druid-models';
import type { Capabilities, CapabilitiesMode } from '../../helpers';
import { STANDARD_TABLE_PAGE_SIZE, STANDARD_TABLE_PAGE_SIZE_OPTIONS } from '../../react-table';
import { Api, AppToaster } from '../../singletons';
import type { NumberLike } from '../../utils';
import type { AuxiliaryQueryFn, NumberLike } from '../../utils';
import {
assemble,
deepGet,
filterMap,
formatBytes,
formatBytesCompact,
formatDurationWithMsIfNeeded,
hasPopoverOpen,
LocalStorageBackedVisibility,
LocalStorageKeys,
lookupBy,
oneOf,
pluralIfNeeded,
prettyFormatIsoDateWithMsIfNeeded,
queryDruidSql,
QueryManager,
QueryState,
ResultWithAuxiliaryWork,
} from '../../utils';
import type { BasicAction } from '../../utils/basic-action';
@ -97,28 +101,9 @@ const TABLE_COLUMNS_BY_MODE: Record<CapabilitiesMode, TableColumnSelectorColumn[
],
};
function formatQueues(
segmentsToLoad: NumberLike,
segmentsToLoadSize: NumberLike,
segmentsToDrop: NumberLike,
segmentsToDropSize: NumberLike,
): string {
const queueParts: string[] = [];
if (segmentsToLoad) {
queueParts.push(
`${pluralIfNeeded(segmentsToLoad, 'segment')} to load (${formatBytesCompact(
segmentsToLoadSize,
)})`,
);
}
if (segmentsToDrop) {
queueParts.push(
`${pluralIfNeeded(segmentsToDrop, 'segment')} to drop (${formatBytesCompact(
segmentsToDropSize,
)})`,
);
}
return queueParts.join(', ') || 'Empty load/drop queues';
interface ServicesQuery {
capabilities: Capabilities;
visibleColumns: LocalStorageBackedVisibility;
}
export interface ServicesViewProps {
@ -149,8 +134,8 @@ interface ServiceResultRow {
readonly plaintext_port: number;
readonly tls_port: number;
readonly start_time: string;
loadQueueInfo?: LoadQueueInfo;
workerInfo?: WorkerInfo;
readonly loadQueueInfo?: LoadQueueInfo;
readonly workerInfo?: WorkerInfo;
}
interface LoadQueueInfo {
@ -158,6 +143,44 @@ interface LoadQueueInfo {
readonly segmentsToDropSize: NumberLike;
readonly segmentsToLoad: NumberLike;
readonly segmentsToLoadSize: NumberLike;
readonly expectedLoadTimeMillis: NumberLike;
}
function formatLoadQueueInfo({
segmentsToDrop,
segmentsToDropSize,
segmentsToLoad,
segmentsToLoadSize,
expectedLoadTimeMillis,
}: LoadQueueInfo): string {
return (
assemble(
segmentsToLoad
? `${pluralIfNeeded(segmentsToLoad, 'segment')} to load (${formatBytesCompact(
segmentsToLoadSize,
)}${
expectedLoadTimeMillis
? `, ${formatDurationWithMsIfNeeded(expectedLoadTimeMillis)}`
: ''
})`
: undefined,
segmentsToDrop
? `${pluralIfNeeded(segmentsToDrop, 'segment')} to drop (${formatBytesCompact(
segmentsToDropSize,
)})`
: undefined,
).join(', ') || 'Empty load/drop queues'
);
}
function aggregateLoadQueueInfos(loadQueueInfos: LoadQueueInfo[]): LoadQueueInfo {
return {
segmentsToLoad: sum(loadQueueInfos, s => Number(s.segmentsToLoad) || 0),
segmentsToLoadSize: sum(loadQueueInfos, s => Number(s.segmentsToLoadSize) || 0),
segmentsToDrop: sum(loadQueueInfos, s => Number(s.segmentsToDrop) || 0),
segmentsToDropSize: sum(loadQueueInfos, s => Number(s.segmentsToDropSize) || 0),
expectedLoadTimeMillis: max(loadQueueInfos, s => Number(s.expectedLoadTimeMillis) || 0) || 0,
};
}
interface WorkerInfo {
@ -178,7 +201,7 @@ interface WorkerInfo {
}
export class ServicesView extends React.PureComponent<ServicesViewProps, ServicesViewState> {
private readonly serviceQueryManager: QueryManager<Capabilities, ServiceResultRow[]>;
private readonly serviceQueryManager: QueryManager<ServicesQuery, ServiceResultRow[]>;
// Ranking
// coordinator => 8
@ -218,23 +241,6 @@ ORDER BY
) DESC,
"service" DESC`;
static async getServices(): Promise<ServiceResultRow[]> {
const allServiceResp = await Api.instance.get('/druid/coordinator/v1/servers?simple');
const allServices = allServiceResp.data;
return allServices.map((s: any) => {
return {
service: s.host,
service_type: s.type === 'indexer-executor' ? 'peon' : s.type,
tier: s.tier,
host: s.host.split(':')[0],
plaintext_port: parseInt(s.host.split(':')[1], 10),
curr_size: s.currSize,
max_size: s.maxSize,
tls_port: -1,
};
});
}
constructor(props: ServicesViewProps) {
super(props);
this.state = {
@ -246,48 +252,75 @@ ORDER BY
};
this.serviceQueryManager = new QueryManager({
processQuery: async capabilities => {
processQuery: async ({ capabilities, visibleColumns }, cancelToken) => {
let services: ServiceResultRow[];
if (capabilities.hasSql()) {
services = await queryDruidSql({ query: ServicesView.SERVICE_SQL });
services = await queryDruidSql({ query: ServicesView.SERVICE_SQL }, cancelToken);
} else if (capabilities.hasCoordinatorAccess()) {
services = await ServicesView.getServices();
services = (
await Api.instance.get('/druid/coordinator/v1/servers?simple', { cancelToken })
).data.map((s: any): ServiceResultRow => {
const hostParts = s.host.split(':');
const port = parseInt(hostParts[1], 10);
return {
service: s.host,
service_type: s.type === 'indexer-executor' ? 'peon' : s.type,
tier: s.tier,
host: hostParts[0],
plaintext_port: port < 9000 ? port : -1,
tls_port: port < 9000 ? -1 : port,
curr_size: s.currSize,
max_size: s.maxSize,
start_time: '1970:01:01T00:00:00Z',
is_leader: 0,
};
});
} else {
throw new Error(`must have SQL or coordinator access`);
}
if (capabilities.hasCoordinatorAccess()) {
const auxiliaryQueries: AuxiliaryQueryFn<ServiceResultRow[]>[] = [];
if (capabilities.hasCoordinatorAccess() && visibleColumns.shown('Details')) {
auxiliaryQueries.push(async (services, cancelToken) => {
try {
const loadQueueInfos = (
await Api.instance.get<Record<string, LoadQueueInfo>>(
'/druid/coordinator/v1/loadqueue?simple',
{ cancelToken },
)
).data;
services.forEach(s => {
s.loadQueueInfo = loadQueueInfos[s.service];
});
return services.map(s => ({
...s,
loadQueueInfo: loadQueueInfos[s.service],
}));
} catch {
AppToaster.show({
icon: IconNames.ERROR,
intent: Intent.DANGER,
message: 'There was an error getting the load queue info',
});
return services;
}
});
}
if (capabilities.hasOverlordAccess()) {
auxiliaryQueries.push(async (services, cancelToken) => {
try {
const workerInfos = (await Api.instance.get<WorkerInfo[]>('/druid/indexer/v1/workers'))
.data;
const workerInfos = (
await Api.instance.get<WorkerInfo[]>('/druid/indexer/v1/workers', { cancelToken })
).data;
const workerInfoLookup: Record<string, WorkerInfo> = lookupBy(
workerInfos,
m => m.worker?.host,
);
services.forEach(s => {
s.workerInfo = workerInfoLookup[s.service];
});
return services.map(s => ({
...s,
workerInfo: workerInfoLookup[s.service],
}));
} catch (e) {
// Swallow this error because it simply a reflection of a local task runner.
if (
@ -299,10 +332,12 @@ ORDER BY
message: 'There was an error getting the worker info',
});
}
return services;
}
});
}
return services;
return new ResultWithAuxiliaryWork(services, auxiliaryQueries);
},
onStateChange: servicesState => {
this.setState({
@ -314,7 +349,8 @@ ORDER BY
componentDidMount(): void {
const { capabilities } = this.props;
this.serviceQueryManager.runQuery(capabilities);
const { visibleColumns } = this.state;
this.serviceQueryManager.runQuery({ capabilities, visibleColumns });
}
componentWillUnmount(): void {
@ -490,10 +526,7 @@ ORDER BY
)
) {
const workerInfos: WorkerInfo[] = filterMap(originalRows, r => r.workerInfo);
if (!workerInfos.length) {
return 'Could not get worker infos';
}
if (!workerInfos.length) return '';
const totalCurrCapacityUsed = sum(
workerInfos,
@ -517,9 +550,7 @@ ORDER BY
case 'indexer':
case 'middle_manager': {
if (!deepGet(original, 'workerInfo')) {
return 'Could not get capacity info';
}
if (!deepGet(original, 'workerInfo')) return '';
const currCapacityUsed = deepGet(original, 'workerInfo.currCapacityUsed') || 0;
const capacity = deepGet(original, 'workerInfo.worker.capacity');
if (typeof capacity === 'number') {
@ -554,18 +585,24 @@ ORDER BY
case 'middle_manager':
case 'indexer': {
const { workerInfo } = row;
if (!workerInfo) {
return 'Could not get detail info';
}
if (!workerInfo) return '';
if (workerInfo.worker.version === '') return 'Disabled';
const details: string[] = [];
if (workerInfo.lastCompletedTaskTime) {
details.push(`Last completed task: ${workerInfo.lastCompletedTaskTime}`);
details.push(
`Last completed task: ${prettyFormatIsoDateWithMsIfNeeded(
workerInfo.lastCompletedTaskTime,
)}`,
);
}
if (workerInfo.blacklistedUntil) {
details.push(`Blacklisted until: ${workerInfo.blacklistedUntil}`);
details.push(
`Blacklisted until: ${prettyFormatIsoDateWithMsIfNeeded(
workerInfo.blacklistedUntil,
)}`,
);
}
return details.join(' ');
}
@ -599,16 +636,7 @@ ORDER BY
case 'historical': {
const { loadQueueInfo } = original;
if (!loadQueueInfo) return 'Could not get load queue info';
const { segmentsToLoad, segmentsToLoadSize, segmentsToDrop, segmentsToDropSize } =
loadQueueInfo;
return formatQueues(
segmentsToLoad,
segmentsToLoadSize,
segmentsToDrop,
segmentsToDropSize,
);
return loadQueueInfo ? formatLoadQueueInfo(loadQueueInfo) : '';
}
default:
@ -618,29 +646,10 @@ ORDER BY
Aggregated: ({ subRows }) => {
const originalRows = subRows.map(r => r._original);
if (!originalRows.some(r => r.service_type === 'historical')) return '';
const loadQueueInfos: LoadQueueInfo[] = filterMap(originalRows, r => r.loadQueueInfo);
if (!loadQueueInfos.length) {
return 'Could not get load queue infos';
}
const segmentsToLoad = sum(loadQueueInfos, s => Number(s.segmentsToLoad) || 0);
const segmentsToLoadSize = sum(
loadQueueInfos,
s => Number(s.segmentsToLoadSize) || 0,
);
const segmentsToDrop = sum(loadQueueInfos, s => Number(s.segmentsToDrop) || 0);
const segmentsToDropSize = sum(
loadQueueInfos,
s => Number(s.segmentsToDropSize) || 0,
);
return formatQueues(
segmentsToLoad,
segmentsToLoadSize,
segmentsToDrop,
segmentsToDropSize,
);
return loadQueueInfos.length
? formatLoadQueueInfo(aggregateLoadQueueInfos(loadQueueInfos))
: '';
},
},
{

View File

@ -407,12 +407,15 @@ export const SchemaStep = function SchemaStep(props: SchemaStepProps) {
const [existingTableState] = useQueryManager<string, string[]>({
initQuery: '',
processQuery: async (_: string, _cancelToken) => {
processQuery: async (_, cancelToken) => {
// Check if datasource already exists
const tables = await queryDruidSql({
const tables = await queryDruidSql(
{
query: `SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'druid' ORDER BY TABLE_NAME ASC`,
resultFormat: 'array',
});
},
cancelToken,
);
return tables.map(t => t[0]);
},
@ -426,7 +429,7 @@ export const SchemaStep = function SchemaStep(props: SchemaStepProps) {
const [sampleState] = useQueryManager<ExternalConfig, QueryResult, Execution>({
query: sampleExternalConfig,
processQuery: async sampleExternalConfig => {
processQuery: async (sampleExternalConfig, cancelToken) => {
const sampleResponse = await postToSampler(
{
type: 'index_parallel',
@ -469,6 +472,7 @@ export const SchemaStep = function SchemaStep(props: SchemaStepProps) {
},
},
'sample',
cancelToken,
);
const columns = getHeaderFromSampleResponse(sampleResponse).map(({ name, type }) => {

View File

@ -165,13 +165,16 @@ ORDER BY
};
this.taskQueryManager = new QueryManager({
processQuery: async capabilities => {
processQuery: async (capabilities, cancelToken) => {
if (capabilities.hasSql()) {
return await queryDruidSql({
return await queryDruidSql(
{
query: TasksView.TASK_SQL,
});
},
cancelToken,
);
} else if (capabilities.hasOverlordAccess()) {
const resp = await Api.instance.get(`/druid/indexer/v1/tasks`);
const resp = await Api.instance.get(`/druid/indexer/v1/tasks`, { cancelToken });
return TasksView.parseTasks(resp.data);
} else {
throw new Error(`must have SQL or overlord access`);

View File

@ -66,8 +66,8 @@ export const CurrentDartPanel = React.memo(function CurrentViberPanel(
const [dartQueryEntriesState, queryManager] = useQueryManager<number, DartQueryEntry[]>({
query: workStateVersion,
processQuery: async _ => {
return (await Api.instance.get('/druid/v2/sql/dart')).data.queries;
processQuery: async (_, cancelToken) => {
return (await Api.instance.get('/druid/v2/sql/dart', { cancelToken })).data.queries;
},
});

View File

@ -39,11 +39,11 @@ export const ExecutionDetailsPaneLoader = React.memo(function ExecutionDetailsPa
const { id, initTab, initExecution, goToTask } = props;
const [executionState, queryManager] = useQueryManager<string, Execution>({
processQuery: (id: string) => {
return getTaskExecution(id);
},
initQuery: initExecution ? undefined : id,
initState: initExecution ? new QueryState({ data: initExecution }) : undefined,
processQuery: (id, cancelToken) => {
return getTaskExecution(id, undefined, cancelToken);
},
});
useInterval(() => {

View File

@ -35,10 +35,10 @@ export const ExecutionStagesPaneLoader = React.memo(function ExecutionStagesPane
const { id, goToTask } = props;
const [executionState, queryManager] = useQueryManager<string, Execution>({
processQuery: (id: string) => {
return getTaskExecution(id);
},
initQuery: id,
processQuery: (id, cancelToken) => {
return getTaskExecution(id, undefined, cancelToken);
},
});
useInterval(() => {

View File

@ -75,7 +75,8 @@ export const ExplainDialog = React.memo(function ExplainDialog(props: ExplainDia
const { queryWithContext, onClose, openQueryLabel, onOpenQuery, mandatoryQueryContext } = props;
const [explainState] = useQueryManager<QueryContextEngine, QueryExplanation[] | string>({
processQuery: async queryWithContext => {
initQuery: queryWithContext,
processQuery: async (queryWithContext, cancelToken) => {
const { engine, queryString, queryContext, wrapQueryLimit } = queryWithContext;
let context: QueryContext | undefined;
@ -98,19 +99,19 @@ export const ExplainDialog = React.memo(function ExplainDialog(props: ExplainDia
let result: any[];
switch (engine) {
case 'sql-native':
result = await queryDruidSql(payload);
result = await queryDruidSql(payload, cancelToken);
break;
case 'sql-msq-task':
try {
result = (await Api.instance.post(`/druid/v2/sql/task`, payload)).data;
result = (await Api.instance.post(`/druid/v2/sql/task`, payload, { cancelToken })).data;
} catch (e) {
throw new Error(getDruidErrorMessage(e));
}
break;
case 'sql-msq-dart':
result = await queryDruidSqlDart(payload);
result = await queryDruidSqlDart(payload, cancelToken);
break;
default:
@ -128,7 +129,6 @@ export const ExplainDialog = React.memo(function ExplainDialog(props: ExplainDia
return plan;
}
},
initQuery: queryWithContext,
});
let content: JSX.Element;

View File

@ -98,7 +98,7 @@ export const InputFormatStep = React.memo(function InputFormatStep(props: InputF
const [previewState] = useQueryManager<InputSourceAndFormat, SampleResponse>({
query: inputSourceAndFormatToSample,
processQuery: async ({ inputSource, inputFormat }) => {
processQuery: async ({ inputSource, inputFormat }, cancelToken) => {
const fixedFormatSource = inputSource.type === 'delta';
if (!fixedFormatSource && !isValidInputFormat(inputFormat))
throw new Error('invalid input format');
@ -131,7 +131,7 @@ export const InputFormatStep = React.memo(function InputFormatStep(props: InputF
},
};
return await postToSampler(sampleSpec, 'input-format-step');
return await postToSampler(sampleSpec, 'input-format-step', cancelToken);
},
});

View File

@ -90,8 +90,9 @@ export const RecentQueryTaskPanel = React.memo(function RecentQueryTaskPanel(
const [queryTaskHistoryState, queryManager] = useQueryManager<number, RecentQueryEntry[]>({
query: workStateVersion,
processQuery: async _ => {
return await queryDruidSql<RecentQueryEntry>({
processQuery: async (_, cancelToken) => {
return await queryDruidSql<RecentQueryEntry>(
{
query: `SELECT
CASE WHEN ${TASK_CANCELED_PREDICATE} THEN 'CANCELED' ELSE "status" END AS "taskStatus",
"task_id" AS "taskId",
@ -103,7 +104,9 @@ FROM sys.tasks
WHERE "type" = 'query_controller'
ORDER BY "created_time" DESC
LIMIT 100`,
});
},
cancelToken,
);
},
});

View File

@ -212,10 +212,13 @@ export class WorkbenchView extends React.PureComponent<WorkbenchViewProps, Workb
componentDidMount(): void {
this.metadataQueryManager = new QueryManager({
processQuery: async () => {
return await queryDruidSql<ColumnMetadata>({
processQuery: async (_, cancelToken) => {
return await queryDruidSql<ColumnMetadata>(
{
query: `SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS`,
});
},
cancelToken,
);
},
onStateChange: columnMetadataState => {
if (columnMetadataState.error) {