mirror of https://github.com/apache/druid.git
Added UI support for waitTillSegmentsLoad (#15110)
This relies on the work done in #14322 and #15076. It allows the user to set waitTillSegmentsLoad in the query context (if they want, else it defaults to true) and shows the results in the UI :
This commit is contained in:
parent
5f86072456
commit
dba0246aca
|
@ -268,6 +268,7 @@ describe('Execution', () => {
|
|||
"maxNumTasks": 2,
|
||||
},
|
||||
"result": undefined,
|
||||
"segmentStatus": undefined,
|
||||
"sqlQuery": "REPLACE INTO \\"kttm_simple\\" OVERWRITE ALL
|
||||
SELECT
|
||||
TIME_PARSE(\\"timestamp\\") AS \\"__time\\",
|
||||
|
@ -643,6 +644,7 @@ describe('Execution', () => {
|
|||
"sqlQuery": undefined,
|
||||
"sqlQueryId": undefined,
|
||||
},
|
||||
"segmentStatus": undefined,
|
||||
"sqlQuery": undefined,
|
||||
"stages": undefined,
|
||||
"startTime": 2023-07-05T21:33:19.147Z,
|
||||
|
@ -679,6 +681,7 @@ describe('Execution', () => {
|
|||
"nativeQuery": undefined,
|
||||
"queryContext": undefined,
|
||||
"result": undefined,
|
||||
"segmentStatus": undefined,
|
||||
"sqlQuery": undefined,
|
||||
"stages": undefined,
|
||||
"startTime": 2023-07-05T21:40:39.986Z,
|
||||
|
|
|
@ -164,6 +164,18 @@ function formatPendingMessage(
|
|||
}
|
||||
}
|
||||
|
||||
interface SegmentStatus {
|
||||
duration: number;
|
||||
onDemandSegments: number;
|
||||
pendingSegments: number;
|
||||
precachedSegments: number;
|
||||
startTime: Date;
|
||||
state: 'INIT' | 'WAITING' | 'SUCCESS';
|
||||
totalSegments: number;
|
||||
unknownSegments: number;
|
||||
usedSegments: number;
|
||||
}
|
||||
|
||||
export interface ExecutionValue {
|
||||
engine: DruidEngine;
|
||||
id: string;
|
||||
|
@ -182,6 +194,7 @@ export interface ExecutionValue {
|
|||
warnings?: ExecutionError[];
|
||||
capacityInfo?: CapacityInfo;
|
||||
_payload?: MsqTaskPayloadResponse;
|
||||
segmentStatus?: SegmentStatus;
|
||||
}
|
||||
|
||||
export class Execution {
|
||||
|
@ -292,6 +305,11 @@ export class Execution {
|
|||
const startTime = new Date(deepGet(taskReport, 'multiStageQuery.payload.status.startTime'));
|
||||
const durationMs = deepGet(taskReport, 'multiStageQuery.payload.status.durationMs');
|
||||
|
||||
const segmentLoaderStatus = deepGet(
|
||||
taskReport,
|
||||
'multiStageQuery.payload.status.segmentLoadWaiterStatus',
|
||||
);
|
||||
|
||||
let result: QueryResult | undefined;
|
||||
const resultsPayload: {
|
||||
signature: { name: string; type: string }[];
|
||||
|
@ -313,6 +331,7 @@ export class Execution {
|
|||
engine: 'sql-msq-task',
|
||||
id,
|
||||
status: Execution.normalizeTaskStatus(status),
|
||||
segmentStatus: segmentLoaderStatus,
|
||||
startTime: isNaN(startTime.getTime()) ? undefined : startTime,
|
||||
duration: typeof durationMs === 'number' ? durationMs : undefined,
|
||||
usageInfo: getUsageInfoFromStatusPayload(
|
||||
|
@ -369,6 +388,7 @@ export class Execution {
|
|||
public readonly error?: ExecutionError;
|
||||
public readonly warnings?: ExecutionError[];
|
||||
public readonly capacityInfo?: CapacityInfo;
|
||||
public readonly segmentStatus?: SegmentStatus;
|
||||
|
||||
public readonly _payload?: { payload: any; task: string };
|
||||
|
||||
|
@ -390,6 +410,7 @@ export class Execution {
|
|||
this.error = value.error;
|
||||
this.warnings = nonEmptyArray(value.warnings) ? value.warnings : undefined;
|
||||
this.capacityInfo = value.capacityInfo;
|
||||
this.segmentStatus = value.segmentStatus;
|
||||
|
||||
this._payload = value._payload;
|
||||
}
|
||||
|
@ -412,6 +433,7 @@ export class Execution {
|
|||
error: this.error,
|
||||
warnings: this.warnings,
|
||||
capacityInfo: this.capacityInfo,
|
||||
segmentStatus: this.segmentStatus,
|
||||
|
||||
_payload: this._payload,
|
||||
};
|
||||
|
@ -526,6 +548,34 @@ export class Execution {
|
|||
return status !== 'SUCCESS' && status !== 'FAILED';
|
||||
}
|
||||
|
||||
public getSegmentStatusDescription() {
|
||||
const { segmentStatus } = this;
|
||||
|
||||
let label = '';
|
||||
|
||||
switch (segmentStatus?.state) {
|
||||
case 'INIT':
|
||||
label = 'Waiting for segments loading to start...';
|
||||
break;
|
||||
|
||||
case 'WAITING':
|
||||
label = 'Waiting for segments loading to complete...';
|
||||
break;
|
||||
|
||||
case 'SUCCESS':
|
||||
label = 'Segments loaded successfully in ' + segmentStatus.duration + 'ms.';
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return {
|
||||
label,
|
||||
...segmentStatus,
|
||||
};
|
||||
}
|
||||
|
||||
public isFullyComplete(): boolean {
|
||||
if (this.isWaitingForQuery()) return false;
|
||||
|
||||
|
|
|
@ -162,6 +162,22 @@ export function changeFinalizeAggregations(
|
|||
: deepDelete(context, 'finalizeAggregations');
|
||||
}
|
||||
|
||||
// waitTillSegmentsLoad
|
||||
|
||||
export function getWaitTillSegmentsLoad(context: QueryContext): boolean | undefined {
|
||||
const { waitTillSegmentsLoad } = context;
|
||||
return typeof waitTillSegmentsLoad === 'boolean' ? waitTillSegmentsLoad : undefined;
|
||||
}
|
||||
|
||||
export function changeWaitTillSegmentsLoad(
|
||||
context: QueryContext,
|
||||
waitTillSegmentsLoad: boolean | undefined,
|
||||
): QueryContext {
|
||||
return typeof waitTillSegmentsLoad === 'boolean'
|
||||
? deepSet(context, 'waitTillSegmentsLoad', waitTillSegmentsLoad)
|
||||
: deepDelete(context, 'waitTillSegmentsLoad');
|
||||
}
|
||||
|
||||
// groupByEnableMultiValueUnnesting
|
||||
|
||||
export function getGroupByEnableMultiValueUnnesting(context: QueryContext): boolean | undefined {
|
||||
|
|
|
@ -423,6 +423,7 @@ describe('WorkbenchQuery', () => {
|
|||
finalizeAggregations: false,
|
||||
groupByEnableMultiValueUnnesting: false,
|
||||
useCache: false,
|
||||
waitTillSegmentsLoad: true,
|
||||
},
|
||||
header: true,
|
||||
query: 'INSERT INTO wiki2 SELECT * FROM wikipedia',
|
||||
|
|
|
@ -552,6 +552,7 @@ export class WorkbenchQuery {
|
|||
apiQuery.context.executionMode ??= 'async';
|
||||
apiQuery.context.finalizeAggregations ??= !ingestQuery;
|
||||
apiQuery.context.groupByEnableMultiValueUnnesting ??= !ingestQuery;
|
||||
apiQuery.context.waitTillSegmentsLoad ??= true;
|
||||
}
|
||||
|
||||
if (Array.isArray(queryParameters) && queryParameters.length) {
|
||||
|
|
|
@ -57,7 +57,13 @@ export interface SubmitTaskQueryOptions {
|
|||
export async function submitTaskQuery(
|
||||
options: SubmitTaskQueryOptions,
|
||||
): Promise<Execution | IntermediateQueryState<Execution>> {
|
||||
const { query, context, prefixLines, cancelToken, preserveOnTermination, onSubmitted } = options;
|
||||
const { query, prefixLines, cancelToken, preserveOnTermination, onSubmitted } = options;
|
||||
|
||||
// setting waitTillSegmentsLoad to true by default
|
||||
const context = {
|
||||
waitTillSegmentsLoad: true,
|
||||
...(options.context || {}),
|
||||
};
|
||||
|
||||
let sqlQuery: string;
|
||||
let jsonQuery: Record<string, any>;
|
||||
|
@ -261,6 +267,11 @@ export async function updateExecutionWithDatasourceLoadedIfNeeded(
|
|||
return execution;
|
||||
}
|
||||
|
||||
// This means we don't have to perform the SQL query to check if the segments are loaded
|
||||
if (execution.queryContext?.waitTillSegmentsLoad === true) {
|
||||
return execution.markDestinationDatasourceLoaded();
|
||||
}
|
||||
|
||||
const endTime = execution.getEndTime();
|
||||
if (
|
||||
!endTime || // If endTime is not set (this is not expected to happen) then just bow out
|
||||
|
|
|
@ -22,6 +22,7 @@ exports[`ExecutionDetailsPane matches snapshot no init tab 1`] = `
|
|||
"id": "native",
|
||||
"label": "Native query",
|
||||
},
|
||||
false,
|
||||
undefined,
|
||||
undefined,
|
||||
Object {
|
||||
|
@ -286,6 +287,7 @@ PARTITIONED BY DAY",
|
|||
"maxParseExceptions": 2,
|
||||
},
|
||||
"result": undefined,
|
||||
"segmentStatus": undefined,
|
||||
"sqlQuery": "REPLACE INTO \\"kttm-blank-lines\\" OVERWRITE ALL
|
||||
SELECT
|
||||
TIME_PARSE(\\"timestamp\\") AS \\"__time\\",
|
||||
|
@ -909,6 +911,7 @@ PARTITIONED BY DAY",
|
|||
"maxParseExceptions": 2,
|
||||
},
|
||||
"result": undefined,
|
||||
"segmentStatus": undefined,
|
||||
"sqlQuery": "REPLACE INTO \\"kttm-blank-lines\\" OVERWRITE ALL
|
||||
SELECT
|
||||
TIME_PARSE(\\"timestamp\\") AS \\"__time\\",
|
||||
|
@ -1319,6 +1322,7 @@ exports[`ExecutionDetailsPane matches snapshot with init tab 1`] = `
|
|||
"id": "native",
|
||||
"label": "Native query",
|
||||
},
|
||||
false,
|
||||
undefined,
|
||||
undefined,
|
||||
Object {
|
||||
|
@ -1576,6 +1580,7 @@ PARTITIONED BY DAY",
|
|||
"maxParseExceptions": 2,
|
||||
},
|
||||
"result": undefined,
|
||||
"segmentStatus": undefined,
|
||||
"sqlQuery": "REPLACE INTO \\"kttm-blank-lines\\" OVERWRITE ALL
|
||||
SELECT
|
||||
TIME_PARSE(\\"timestamp\\") AS \\"__time\\",
|
||||
|
|
|
@ -23,7 +23,7 @@ import React, { useState } from 'react';
|
|||
|
||||
import { FancyTabPane } from '../../../components';
|
||||
import type { Execution } from '../../../druid-models';
|
||||
import { pluralIfNeeded } from '../../../utils';
|
||||
import { formatDuration, formatDurationWithMs, pluralIfNeeded } from '../../../utils';
|
||||
import { DestinationPagesPane } from '../destination-pages-pane/destination-pages-pane';
|
||||
import { ExecutionErrorPane } from '../execution-error-pane/execution-error-pane';
|
||||
import { ExecutionStagesPane } from '../execution-stages-pane/execution-stages-pane';
|
||||
|
@ -40,7 +40,8 @@ export type ExecutionDetailsTab =
|
|||
| 'result'
|
||||
| 'pages'
|
||||
| 'error'
|
||||
| 'warnings';
|
||||
| 'warnings'
|
||||
| 'segmentStatus';
|
||||
|
||||
interface ExecutionDetailsPaneProps {
|
||||
execution: Execution;
|
||||
|
@ -53,6 +54,7 @@ export const ExecutionDetailsPane = React.memo(function ExecutionDetailsPane(
|
|||
) {
|
||||
const { execution, initTab, goToTask } = props;
|
||||
const [activeTab, setActiveTab] = useState<ExecutionDetailsTab>(initTab || 'general');
|
||||
const segmentStatusDescription = execution.getSegmentStatusDescription();
|
||||
|
||||
function renderContent() {
|
||||
switch (activeTab) {
|
||||
|
@ -120,6 +122,25 @@ export const ExecutionDetailsPane = React.memo(function ExecutionDetailsPane(
|
|||
case 'warnings':
|
||||
return <ExecutionWarningsPane execution={execution} />;
|
||||
|
||||
case 'segmentStatus':
|
||||
return (
|
||||
<>
|
||||
<p>
|
||||
Duration:{' '}
|
||||
{segmentStatusDescription.duration
|
||||
? formatDurationWithMs(segmentStatusDescription.duration)
|
||||
: '-'}
|
||||
{execution.duration
|
||||
? ` (query duration was ${formatDuration(execution.duration)})`
|
||||
: ''}
|
||||
</p>
|
||||
<p>Total segments: {segmentStatusDescription.totalSegments ?? '-'}</p>
|
||||
<p>Used segments: {segmentStatusDescription.usedSegments ?? '-'}</p>
|
||||
<p>Precached segments: {segmentStatusDescription.precachedSegments ?? '-'}</p>
|
||||
<p>On demand segments: {segmentStatusDescription.onDemandSegments ?? '-'}</p>
|
||||
</>
|
||||
);
|
||||
|
||||
default:
|
||||
return;
|
||||
}
|
||||
|
@ -146,6 +167,11 @@ export const ExecutionDetailsPane = React.memo(function ExecutionDetailsPane(
|
|||
label: 'Native query',
|
||||
icon: IconNames.COG,
|
||||
},
|
||||
Boolean(execution.segmentStatus) && {
|
||||
id: 'segmentStatus',
|
||||
label: 'Segments',
|
||||
icon: IconNames.HEAT_GRID,
|
||||
},
|
||||
execution.result && {
|
||||
id: 'result',
|
||||
label: 'Results',
|
||||
|
|
|
@ -20,5 +20,8 @@ exports[`ExecutionProgressBarPane matches snapshot 1`] = `
|
|||
className="overall"
|
||||
intent="primary"
|
||||
/>
|
||||
<Unknown>
|
||||
|
||||
</Unknown>
|
||||
</div>
|
||||
`;
|
||||
|
|
|
@ -50,6 +50,9 @@ export const ExecutionProgressBarPane = React.memo(function ExecutionProgressBar
|
|||
|
||||
const idx = stages ? stages.currentStageIndex() : -1;
|
||||
const waitingForSegments = stages && !execution.isWaitingForQuery();
|
||||
|
||||
const segmentStatusDescription = execution?.getSegmentStatusDescription();
|
||||
|
||||
return (
|
||||
<div className="execution-progress-bar-pane">
|
||||
<Label>
|
||||
|
@ -78,6 +81,7 @@ export const ExecutionProgressBarPane = React.memo(function ExecutionProgressBar
|
|||
intent={stages ? Intent.PRIMARY : undefined}
|
||||
value={stages && execution.isWaitingForQuery() ? stages.overallProgress() : undefined}
|
||||
/>
|
||||
{segmentStatusDescription && <Label>{segmentStatusDescription.label}</Label>}
|
||||
{stages && idx >= 0 && (
|
||||
<>
|
||||
<Label>{`Current stage (${idx + 1} of ${stages.stageCount()})`}</Label>
|
||||
|
|
|
@ -9,6 +9,7 @@ exports[`IngestSuccessPane matches snapshot 1`] = `
|
|||
</p>
|
||||
<p>
|
||||
Insert query took 0:00:23.
|
||||
|
||||
<span
|
||||
className="action"
|
||||
onClick={[Function]}
|
||||
|
|
|
@ -44,7 +44,9 @@ export const IngestSuccessPane = React.memo(function IngestSuccessPane(
|
|||
|
||||
const warnings = execution.stages?.getWarningCount() || 0;
|
||||
|
||||
const duration = execution.duration;
|
||||
const { duration } = execution;
|
||||
const segmentStatusDescription = execution.getSegmentStatusDescription();
|
||||
|
||||
return (
|
||||
<div className="ingest-success-pane">
|
||||
<p>
|
||||
|
@ -63,10 +65,12 @@ export const IngestSuccessPane = React.memo(function IngestSuccessPane(
|
|||
</p>
|
||||
<p>
|
||||
{duration ? `Insert query took ${formatDuration(duration)}. ` : `Insert query completed. `}
|
||||
{segmentStatusDescription ? segmentStatusDescription.label + ' ' : ''}
|
||||
<span className="action" onClick={() => onDetails(execution.id)}>
|
||||
Show details
|
||||
</span>
|
||||
</p>
|
||||
|
||||
{onQueryTab && (
|
||||
<p>
|
||||
Open new tab with:{' '}
|
||||
|
|
|
@ -45,6 +45,7 @@ import {
|
|||
changeUseApproximateCountDistinct,
|
||||
changeUseApproximateTopN,
|
||||
changeUseCache,
|
||||
changeWaitTillSegmentsLoad,
|
||||
getDurableShuffleStorage,
|
||||
getFinalizeAggregations,
|
||||
getGroupByEnableMultiValueUnnesting,
|
||||
|
@ -53,6 +54,7 @@ import {
|
|||
getUseApproximateCountDistinct,
|
||||
getUseApproximateTopN,
|
||||
getUseCache,
|
||||
getWaitTillSegmentsLoad,
|
||||
summarizeIndexSpec,
|
||||
} from '../../../druid-models';
|
||||
import { deepGet, deepSet, pluralIfNeeded, tickIcon } from '../../../utils';
|
||||
|
@ -110,6 +112,7 @@ export const RunPanel = React.memo(function RunPanel(props: RunPanelProps) {
|
|||
|
||||
const maxParseExceptions = getMaxParseExceptions(queryContext);
|
||||
const finalizeAggregations = getFinalizeAggregations(queryContext);
|
||||
const waitTillSegmentsLoad = getWaitTillSegmentsLoad(queryContext);
|
||||
const groupByEnableMultiValueUnnesting = getGroupByEnableMultiValueUnnesting(queryContext);
|
||||
const sqlJoinAlgorithm = queryContext.sqlJoinAlgorithm ?? 'broadcast';
|
||||
const selectDestination = queryContext.selectDestination ?? 'taskReport';
|
||||
|
@ -311,6 +314,15 @@ export const RunPanel = React.memo(function RunPanel(props: RunPanelProps) {
|
|||
changeQueryContext(changeFinalizeAggregations(queryContext, v))
|
||||
}
|
||||
/>
|
||||
<MenuTristate
|
||||
icon={IconNames.STOPWATCH}
|
||||
text="Wait until segments have loaded"
|
||||
value={waitTillSegmentsLoad}
|
||||
undefinedEffectiveValue /* ={true} */
|
||||
onValueChange={v =>
|
||||
changeQueryContext(changeWaitTillSegmentsLoad(queryContext, v))
|
||||
}
|
||||
/>
|
||||
<MenuTristate
|
||||
icon={IconNames.FORK}
|
||||
text="Enable GroupBy multi-value unnesting"
|
||||
|
|
Loading…
Reference in New Issue