Web console: segment writing progress indication (#13929)

* add segment writing progress indication

* update with more metrics

* add push metric
This commit is contained in:
Vadim Ogievetsky 2023-03-22 16:34:38 -07:00 committed by GitHub
parent d81d13b9ba
commit 8d125b7c7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 125 additions and 19 deletions

View File

@ -102,6 +102,7 @@ export interface StageWorkerCounter {
output?: ChannelCounter; output?: ChannelCounter;
shuffle?: ChannelCounter; shuffle?: ChannelCounter;
sortProgress?: SortProgressCounter; sortProgress?: SortProgressCounter;
segmentGenerationProgress?: SegmentGenerationProgressCounter;
warnings?: WarningCounter; warnings?: WarningCounter;
} }
@ -146,6 +147,20 @@ export interface SortProgressCounter {
triviallyComplete?: boolean; triviallyComplete?: boolean;
} }
export interface SegmentGenerationProgressCounter {
type: 'segmentGenerationProgress';
rowsProcessed: number;
rowsPersisted: number;
rowsMerged: number;
rowsPushed: number;
}
export type SegmentGenerationProgressFields =
| 'rowsProcessed'
| 'rowsPersisted'
| 'rowsMerged'
| 'rowsPushed';
export interface WarningCounter { export interface WarningCounter {
type: 'warning'; type: 'warning';
CannotParseExternalData?: number; CannotParseExternalData?: number;
@ -157,6 +172,7 @@ export interface SimpleWideCounter {
[k: `input${number}`]: Record<ChannelFields, number> | undefined; [k: `input${number}`]: Record<ChannelFields, number> | undefined;
output?: Record<ChannelFields, number>; output?: Record<ChannelFields, number>;
shuffle?: Record<ChannelFields, number>; shuffle?: Record<ChannelFields, number>;
segmentGenerationProgress?: SegmentGenerationProgressCounter;
} }
function zeroChannelFields(): Record<ChannelFields, number> { function zeroChannelFields(): Record<ChannelFields, number> {
@ -173,8 +189,12 @@ export class Stages {
static readonly QUERY_START_FACTOR = 0.05; static readonly QUERY_START_FACTOR = 0.05;
static readonly QUERY_END_FACTOR = 0.05; static readonly QUERY_END_FACTOR = 0.05;
static stageType(stage: StageDefinition): string {
return stage.definition.processor.type;
}
static stageWeight(stage: StageDefinition): number { static stageWeight(stage: StageDefinition): number {
return stage.definition.processor.type === 'limit' ? 0.1 : 1; return Stages.stageType(stage) === 'limit' ? 0.1 : 1;
} }
public readonly stages: StageDefinition[]; public readonly stages: StageDefinition[];
@ -214,6 +234,9 @@ export class Stages {
case 'shuffle': case 'shuffle':
return 'Shuffle output'; return 'Shuffle output';
case 'segmentGenerationProgress':
return 'Segment generation';
default: default:
if (counterName.startsWith('input')) { if (counterName.startsWith('input')) {
const inputIndex = Number(counterName.replace('input', '')); const inputIndex = Number(counterName.replace('input', ''));
@ -230,7 +253,7 @@ export class Stages {
} }
stageHasOutput(stage: StageDefinition): boolean { stageHasOutput(stage: StageDefinition): boolean {
return stage.definition.processor.type !== 'segmentGenerator'; return Stages.stageType(stage) !== 'segmentGenerator';
} }
stageHasSort(stage: StageDefinition): boolean { stageHasSort(stage: StageDefinition): boolean {
@ -287,13 +310,16 @@ export class Stages {
) / inputFileCount ) / inputFileCount
); );
} else { } else {
// Otherwise, base it on the stage input divided by the output of all non-broadcast input stages // Otherwise, base it on the stage input divided by the output of all non-broadcast input stages,
// use the segment generation counter in the special case of a segmentGenerator stage
return zeroDivide( return zeroDivide(
sum(input, (inputSource, i) => Stages.stageType(stage) === 'segmentGenerator'
inputSource.type === 'stage' && !broadcast?.includes(i) ? this.getTotalSegmentGenerationProgressForStage(stage, 'rowsPushed')
? this.getTotalCounterForStage(stage, `input${i}`, 'rows') : sum(input, (inputSource, i) =>
: 0, inputSource.type === 'stage' && !broadcast?.includes(i)
), ? this.getTotalCounterForStage(stage, `input${i}`, 'rows')
: 0,
),
sum(input, (inputSource, i) => sum(input, (inputSource, i) =>
inputSource.type === 'stage' && !broadcast?.includes(i) inputSource.type === 'stage' && !broadcast?.includes(i)
? this.getTotalOutputForStage(stages[inputSource.stage], 'rows') ? this.getTotalOutputForStage(stages[inputSource.stage], 'rows')
@ -400,6 +426,15 @@ export class Stages {
); );
} }
getTotalSegmentGenerationProgressForStage(
stage: StageDefinition,
field: SegmentGenerationProgressFields,
): number {
const { counters } = this;
if (!counters) return 0;
return sum(this.getCountersForStage(stage), c => c.segmentGenerationProgress?.[field] || 0);
}
getChannelCounterNamesForStage(stage: StageDefinition): ChannelCounterName[] { getChannelCounterNamesForStage(stage: StageDefinition): ChannelCounterName[] {
const { definition } = stage; const { definition } = stage;
@ -416,8 +451,7 @@ export class Stages {
const channelCounters = this.getChannelCounterNamesForStage(stage); const channelCounters = this.getChannelCounterNamesForStage(stage);
const forStageCounters = counters?.[stageNumber] || {}; const forStageCounters = counters?.[stageNumber] || {};
return Object.keys(forStageCounters).map(key => { return Object.entries(forStageCounters).map(([key, stageCounters]) => {
const stageCounters = forStageCounters[key];
const newWideCounter: SimpleWideCounter = { const newWideCounter: SimpleWideCounter = {
index: Number(key), index: Number(key),
}; };
@ -433,6 +467,7 @@ export class Stages {
} }
: zeroChannelFields(); : zeroChannelFields();
} }
newWideCounter.segmentGenerationProgress = stageCounters.segmentGenerationProgress;
return newWideCounter; return newWideCounter;
}); });
} }

View File

@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
import { IconNames } from '@blueprintjs/icons'; import { IconNames } from '@blueprintjs/icons';
import { RefName } from 'druid-query-toolkit'; import { T } from 'druid-query-toolkit';
import * as JSONBig from 'json-bigint-native'; import * as JSONBig from 'json-bigint-native';
import React, { useState } from 'react'; import React, { useState } from 'react';
@ -51,7 +51,7 @@ export const ExecutionDetailsPane = React.memo(function ExecutionDetailsPane(
return ( return (
<div> <div>
<p>{`General info for ${execution.id}${ <p>{`General info for ${execution.id}${
ingestDatasource ? ` ingesting into ${RefName.create(ingestDatasource, true)}` : '' ingestDatasource ? ` ingesting into ${T(ingestDatasource)}` : ''
}`}</p> }`}</p>
{execution.error && <ExecutionErrorPane execution={execution} />} {execution.error && <ExecutionErrorPane execution={execution} />}
{execution.stages ? ( {execution.stages ? (

View File

@ -31,6 +31,7 @@ import type {
ClusterBy, ClusterBy,
CounterName, CounterName,
Execution, Execution,
SegmentGenerationProgressFields,
SimpleWideCounter, SimpleWideCounter,
StageDefinition, StageDefinition,
} from '../../../druid-models'; } from '../../../druid-models';
@ -125,6 +126,8 @@ export const ExecutionStagesPane = React.memo(function ExecutionStagesPane(
...stages.getInputCountersForStage(stage, 'rows').map(formatRows), ...stages.getInputCountersForStage(stage, 'rows').map(formatRows),
formatRows(stages.getTotalCounterForStage(stage, 'output', 'rows')), formatRows(stages.getTotalCounterForStage(stage, 'output', 'rows')),
formatRows(stages.getTotalCounterForStage(stage, 'shuffle', 'rows')), formatRows(stages.getTotalCounterForStage(stage, 'shuffle', 'rows')),
formatRows(stages.getTotalSegmentGenerationProgressForStage(stage, 'rowsMerged')),
formatRows(stages.getTotalSegmentGenerationProgressForStage(stage, 'rowsPushed')),
]); ]);
const filesValues = filterMap(stages.stages, stage => { const filesValues = filterMap(stages.stages, stage => {
@ -164,6 +167,18 @@ export const ExecutionStagesPane = React.memo(function ExecutionStagesPane(
}); });
} }
const isSegmentGenerator = Stages.stageType(stage) === 'segmentGenerator';
let bracesSegmentRowsMerged: string[] = [];
let bracesSegmentRowsPushed: string[] = [];
if (isSegmentGenerator) {
bracesSegmentRowsMerged = wideCounters.map(wideCounter =>
formatRows(wideCounter.segmentGenerationProgress?.rowsMerged || 0),
);
bracesSegmentRowsPushed = wideCounters.map(wideCounter =>
formatRows(wideCounter.segmentGenerationProgress?.rowsPushed || 0),
);
}
return ( return (
<ReactTable <ReactTable
className="detail-counters-for-workers" className="detail-counters-for-workers"
@ -236,6 +251,30 @@ export const ExecutionStagesPane = React.memo(function ExecutionStagesPane(
}, },
}; };
}), }),
Stages.stageType(stage) === 'segmentGenerator'
? [
{
Header: twoLines('Merged', <i>rows</i>),
id: 'segmentGeneration_rowsMerged',
accessor: d => d.segmentGenerationProgress?.rowsMerged || 0,
className: 'padded',
width: 180,
Cell({ value }) {
return <BracedText text={formatRows(value)} braces={bracesSegmentRowsMerged} />;
},
},
{
Header: twoLines('Pushed', <i>rows</i>),
id: 'segmentGeneration_rowsPushed',
accessor: d => d.segmentGenerationProgress?.rowsPushed || 0,
className: 'padded',
width: 180,
Cell({ value }) {
return <BracedText text={formatRows(value)} braces={bracesSegmentRowsPushed} />;
},
},
]
: [],
)} )}
/> />
); );
@ -420,6 +459,22 @@ ${title} uncompressed size: ${formatBytesCompact(
); );
} }
function dataProcessedSegmentGeneration(
stage: StageDefinition,
field: SegmentGenerationProgressFields,
) {
if (!stages.hasCounterForStage(stage, 'segmentGenerationProgress')) return;
return (
<div className="data-transfer">
<BracedText
text={formatRows(stages.getTotalSegmentGenerationProgressForStage(stage, field))}
braces={rowsValues}
/>
</div>
);
}
return ( return (
<ReactTable <ReactTable
className={classNames('execution-stages-pane', DEFAULT_TABLE_CLASS_NAME)} className={classNames('execution-stages-pane', DEFAULT_TABLE_CLASS_NAME)}
@ -510,10 +565,17 @@ ${title} uncompressed size: ${formatBytesCompact(
<> <>
<div className="counter-spacer extend-right" /> <div className="counter-spacer extend-right" />
<div>{stages.getStageCounterTitle(stage, 'output')}</div> <div>{stages.getStageCounterTitle(stage, 'output')}</div>
{stages.hasCounterForStage(stage, 'shuffle') && (
<div>{stages.getStageCounterTitle(stage, 'shuffle')}</div>
)}
</> </>
)} )}
{stages.hasCounterForStage(stage, 'shuffle') && ( {stages.hasCounterForStage(stage, 'segmentGenerationProgress') && (
<div>{stages.getStageCounterTitle(stage, 'shuffle')}</div> <>
<div className="counter-spacer extend-right" />
<div>Merged</div>
<div>Pushed</div>
</>
)} )}
</> </>
); );
@ -536,10 +598,19 @@ ${title} uncompressed size: ${formatBytesCompact(
: dataProcessedInput(stage, i), : dataProcessedInput(stage, i),
)} )}
{stages.hasCounterForStage(stage, 'output') && ( {stages.hasCounterForStage(stage, 'output') && (
<div className="counter-spacer extend-left" /> <>
<div className="counter-spacer extend-left" />
{dataProcessedOutput(stage)}
{dataProcessedShuffle(stage)}
</>
)}
{stages.hasCounterForStage(stage, 'segmentGenerationProgress') && (
<>
<div className="counter-spacer extend-left" />
{dataProcessedSegmentGeneration(stage, 'rowsMerged')}
{dataProcessedSegmentGeneration(stage, 'rowsPushed')}
</>
)} )}
{dataProcessedOutput(stage)}
{dataProcessedShuffle(stage)}
</> </>
); );
}, },

View File

@ -5,7 +5,7 @@ exports[`IngestSuccessPane matches snapshot 1`] = `
className="ingest-success-pane" className="ingest-success-pane"
> >
<p> <p>
465,346 rows inserted into 'kttm_simple'. 465,346 rows inserted into "kttm_simple".
</p> </p>
<p> <p>
Insert query took 0:00:09. Insert query took 0:00:09.

View File

@ -56,7 +56,7 @@ export const IngestSuccessPane = React.memo(function IngestSuccessPane(
return ( return (
<div className="ingest-success-pane"> <div className="ingest-success-pane">
<p> <p>
{`${rows < 0 ? 'Data' : pluralIfNeeded(rows, 'row')} inserted into '${datasource}'.`} {`${rows < 0 ? 'Data' : pluralIfNeeded(rows, 'row')} inserted into ${T(datasource)}.`}
{warnings > 0 && ( {warnings > 0 && (
<> <>
{' '} {' '}