Web console: Fix the supervisor offset reset dialog. (#16298)

* Add host to query output

* Init fixes for reset offsets

* fix the supervisor offset reset dialog

* Update web-console/src/views/load-data-view/load-data-view.tsx

Co-authored-by: Katya Macedo  <38017980+ektravel@users.noreply.github.com>

* Update web-console/src/views/load-data-view/load-data-view.tsx

Co-authored-by: Katya Macedo  <38017980+ektravel@users.noreply.github.com>

* Update web-console/src/views/load-data-view/load-data-view.tsx

Co-authored-by: Katya Macedo  <38017980+ektravel@users.noreply.github.com>

* reformat code

* &apos;

* fix conflict

---------

Co-authored-by: Katya Macedo <38017980+ektravel@users.noreply.github.com>
This commit is contained in:
Vadim Ogievetsky 2024-04-19 17:25:46 -07:00 committed by GitHub
parent ad5701e891
commit 3e42ebbaea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 244 additions and 52 deletions

View File

@ -100,6 +100,7 @@ export const QueryErrorPane = React.memo(function QueryErrorPane(props: QueryErr
</p>
)}
{error.errorClass && <p>{error.errorClass}</p>}
{error.host && <p>{`Host: ${error.host}`}</p>}
</div>
);
});

View File

@ -26,7 +26,7 @@
max-height: 80vh;
}
.label-button {
pointer-events: none;
.new-offset-label {
margin: 4px 9px 0 0 !important;
}
}

View File

@ -16,32 +16,92 @@
* limitations under the License.
*/
import { Button, Classes, Code, ControlGroup, Dialog, FormGroup, Intent } from '@blueprintjs/core';
import {
Button,
Classes,
ControlGroup,
Dialog,
FormGroup,
Intent,
Label,
Tag,
} from '@blueprintjs/core';
import React, { useState } from 'react';
import { Loader } from '../../components';
import { type FormJsonTabs, FormJsonSelector, JsonInput, Loader } from '../../components';
import { FancyNumericInput } from '../../components/fancy-numeric-input/fancy-numeric-input';
import type { SupervisorOffsetMap, SupervisorStatus } from '../../druid-models';
import { useQueryManager } from '../../hooks';
import { Api, AppToaster } from '../../singletons';
import { deepDelete, deepGet, getDruidErrorMessage } from '../../utils';
import {
deepDelete,
deepGet,
formatInteger,
getDruidErrorMessage,
isNumberLike,
} from '../../utils';
import './supervisor-reset-offsets-dialog.scss';
type OffsetMap = Record<string, number>;
function numberOrUndefined(x: any): number | undefined {
if (typeof x === 'undefined') return;
return Number(x);
}
interface PartitionEntry {
partition: string;
currentOffset?: number;
}
function getPartitionEntries(
supervisorStatus: SupervisorStatus,
partitionOffsetMap: SupervisorOffsetMap,
): PartitionEntry[] {
const latestOffsets = supervisorStatus.payload?.latestOffsets;
const minimumLag = supervisorStatus.payload?.minimumLag;
let partitions: PartitionEntry[];
if (latestOffsets && minimumLag) {
partitions = Object.entries(latestOffsets).map(([partition, latestOffset]) => {
return {
partition,
currentOffset: Number(latestOffset) - Number(minimumLag[partition] || 0),
};
});
} else {
partitions = [];
const numPartitions = supervisorStatus.payload?.partitions;
for (let p = 0; p < numPartitions; p++) {
partitions.push({ partition: String(p) });
}
}
Object.keys(partitionOffsetMap).forEach(p => {
if (partitions.some(({ partition }) => partition === p)) return;
partitions.push({ partition: p });
});
partitions.sort((a, b) => {
return a.partition.localeCompare(b.partition, undefined, { numeric: true });
});
return partitions;
}
interface SupervisorResetOffsetsDialogProps {
supervisorId: string;
supervisorType: string;
onClose: () => void;
onClose(): void;
}
export const SupervisorResetOffsetsDialog = React.memo(function SupervisorResetOffsetsDialog(
props: SupervisorResetOffsetsDialogProps,
) {
const { supervisorId, supervisorType, onClose } = props;
const [offsetsToResetTo, setOffsetsToResetTo] = useState<OffsetMap>({});
const [partitionOffsetMap, setPartitionOffsetMap] = useState<SupervisorOffsetMap>({});
const [currentTab, setCurrentTab] = useState<FormJsonTabs>('form');
const [jsonError, setJsonError] = useState<Error | undefined>();
const disableSubmit = Boolean(jsonError);
const [statusResp] = useQueryManager<string, OffsetMap>({
const [statusResp] = useQueryManager<string, SupervisorStatus>({
initQuery: supervisorId,
processQuery: async supervisorId => {
const statusResp = await Api.instance.get(
@ -51,13 +111,15 @@ export const SupervisorResetOffsetsDialog = React.memo(function SupervisorResetO
},
});
const stream = deepGet(statusResp.data || {}, 'payload.stream');
const latestOffsets = deepGet(statusResp.data || {}, 'payload.latestOffsets');
const latestOffsetsEntries = latestOffsets ? Object.entries(latestOffsets) : undefined;
// Kafka: Topic, Partition, Offset
// Kinesis: Stream, Shard, Sequence number
const partitionLabel = supervisorType === 'kinesis' ? 'Shard' : 'Partition';
const offsetLabel = supervisorType === 'kinesis' ? 'sequence number' : 'offset';
async function onSave() {
async function onSubmit() {
const stream = deepGet(statusResp.data || {}, 'payload.stream');
if (!stream) return;
if (!Object.keys(offsetsToResetTo).length) return;
if (!Object.keys(partitionOffsetMap).length) return;
try {
await Api.instance.post(
@ -67,68 +129,112 @@ export const SupervisorResetOffsetsDialog = React.memo(function SupervisorResetO
partitions: {
type: 'end',
stream,
partitionOffsetMap: offsetsToResetTo,
partitionOffsetMap,
},
},
);
} catch (e) {
AppToaster.show({
message: `Failed to set offsets: ${getDruidErrorMessage(e)}`,
message: `Failed to set ${offsetLabel}s: ${getDruidErrorMessage(e)}`,
intent: Intent.DANGER,
});
return;
}
AppToaster.show({
message: `${supervisorId} offsets have been set`,
message: (
<>
<Tag minimal>{supervisorId}</Tag> {offsetLabel}s have been set.
</>
),
intent: Intent.SUCCESS,
});
onClose();
}
const supervisorStatus = statusResp.data;
return (
<Dialog
className="supervisor-reset-offsets-dialog"
isOpen
onClose={onClose}
title={`Set supervisor offsets: ${supervisorId}`}
title={`Set supervisor ${offsetLabel}s`}
>
<div className={Classes.DIALOG_BODY}>
{statusResp.loading && <Loader />}
{latestOffsetsEntries && (
<>
<p>
Set <Code>{supervisorId}</Code> to specific offsets
Set <Tag minimal>{supervisorId}</Tag> to read from specific {offsetLabel}s.
</p>
{latestOffsetsEntries.map(([key, latestOffset]) => (
<FormGroup key={key} label={key} helperText={`(currently: ${latestOffset})`}>
<FormJsonSelector
tab={currentTab}
onChange={t => {
setJsonError(undefined);
setCurrentTab(t);
}}
/>
{currentTab === 'form' ? (
<>
{statusResp.loading && <Loader />}
{supervisorStatus &&
getPartitionEntries(supervisorStatus, partitionOffsetMap).map(
({ partition, currentOffset }) => (
<FormGroup
key={partition}
label={`${partitionLabel} ${partition}${
typeof currentOffset === 'undefined'
? ''
: ` (current ${offsetLabel}=${formatInteger(currentOffset)})`
}:`}
>
<ControlGroup>
<Button className="label-button" text="New offset:" disabled />
<Label className="new-offset-label">{`New ${offsetLabel}:`}</Label>
<FancyNumericInput
value={offsetsToResetTo[key]}
value={numberOrUndefined(partitionOffsetMap[partition])}
onValueChange={valueAsNumber => {
setOffsetsToResetTo({ ...offsetsToResetTo, [key]: valueAsNumber });
setPartitionOffsetMap({
...partitionOffsetMap,
[partition]: valueAsNumber,
});
}}
onValueEmpty={() => {
setOffsetsToResetTo(deepDelete(offsetsToResetTo, key));
setPartitionOffsetMap(deepDelete(partitionOffsetMap, partition));
}}
min={0}
fill
placeholder="Don't change offset"
placeholder={`Don't change ${offsetLabel}`}
/>
</ControlGroup>
</FormGroup>
))}
{latestOffsetsEntries.length === 0 && (
<p>There are no partitions currently in this supervisor.</p>
),
)}
</>
) : (
<JsonInput
value={partitionOffsetMap}
onChange={setPartitionOffsetMap}
setError={setJsonError}
issueWithValue={value => {
if (!value || typeof value !== 'object') {
return `The ${offsetLabel} map must be an object`;
}
const badValue = Object.entries(value).find(([_, v]) => !isNumberLike(v));
if (badValue) {
return `The value of ${badValue[0]} is not a number`;
}
return;
}}
height="300px"
/>
)}
</div>
<div className={Classes.DIALOG_FOOTER}>
<div className={Classes.DIALOG_FOOTER_ACTIONS}>
<Button text="Close" onClick={onClose} />
<Button text="Save" intent={Intent.PRIMARY} onClick={() => void onSave()} />
<Button
text="Submit"
intent={Intent.PRIMARY}
disabled={disableSubmit}
onClick={() => void onSubmit()}
/>
</div>
</div>
</Dialog>

View File

@ -36,6 +36,7 @@ export * from './metric-spec/metric-spec';
export * from './overlord-dynamic-config/overlord-dynamic-config';
export * from './query-context/query-context';
export * from './stages/stages';
export * from './supervisor-status/supervisor-status';
export * from './task/task';
export * from './time/time';
export * from './timestamp-spec/timestamp-spec';

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import type { NumberLike } from '../../utils';
export type SupervisorOffsetMap = Record<string, NumberLike>;
export interface SupervisorStatus {
generationTime: string;
id: string;
payload: {
dataSource: string;
stream: string;
partitions: number;
replicas: number;
durationSeconds: number;
activeTasks: SupervisorStatusTask[];
publishingTasks: SupervisorStatusTask[];
latestOffsets?: SupervisorOffsetMap;
minimumLag?: SupervisorOffsetMap;
aggregateLag: number;
offsetsLastUpdated: string;
suspended: boolean;
healthy: boolean;
state: string;
detailedState: string;
recentErrors: any[];
};
}
export interface SupervisorStatusTask {
id: string;
startingOffsets: SupervisorOffsetMap;
startTime: '2024-04-12T21:35:34.834Z';
remainingSeconds: number;
type: string;
currentOffsets: SupervisorOffsetMap;
lag: SupervisorOffsetMap;
}

View File

@ -26,19 +26,22 @@ export interface BasicAction {
title: string;
intent?: Intent;
onAction: () => void;
disabledReason?: string;
}
export function basicActionsToMenu(basicActions: BasicAction[]): JSX.Element | undefined {
if (!basicActions.length) return;
return (
<Menu>
{basicActions.map((action, i) => (
{basicActions.map(({ icon, title, intent, onAction, disabledReason }, i) => (
<MenuItem
key={i}
icon={action.icon}
text={action.title}
intent={action.intent}
onClick={action.onAction}
icon={icon}
text={title}
intent={intent}
onClick={onAction}
disabled={Boolean(disabledReason)}
title={disabledReason}
/>
))}
</Menu>

View File

@ -33,6 +33,11 @@ export const EMPTY_ARRAY: any[] = [];
export type NumberLike = number | bigint;
export function isNumberLike(x: unknown): x is NumberLike {
const t = typeof x;
return t === 'number' || t === 'bigint';
}
export function isNumberLikeNaN(x: NumberLike): boolean {
return isNaN(Number(x));
}

View File

@ -3246,6 +3246,26 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
</>
),
},
{
name: 'suspended',
type: 'boolean',
defined: isStreamingSpec,
defaultValue: false,
info: (
<>
<p>Create a supervisor in a suspended state.</p>
<p>
Creating a supervisor in a suspended state can be helpful if you are not yet
ready to begin ingesting data or if you prefer to configure the
supervisor&apos;s metadata before starting it.
</p>
<p>
You can configure the exact offsets that the supervisor will read from using
the <Code>Actions</Code> menu on the <Code>Supervisors</Code> tab.
</p>
</>
),
},
]}
model={spec}
onChange={this.updateSpec}

View File

@ -343,14 +343,16 @@ GROUP BY 1, 2`;
},
{
icon: IconNames.STEP_BACKWARD,
title: 'Set offsets',
title: `Set ${type === 'kinesis' ? 'sequence numbers' : 'offsets'}`,
onAction: () => this.setState({ resetOffsetsSupervisorInfo: { id, type } }),
disabledReason: supervisorSuspended ? undefined : `Supervisor must be suspended`,
},
{
icon: IconNames.STEP_BACKWARD,
title: 'Hard reset',
intent: Intent.DANGER,
onAction: () => this.setState({ resetSupervisorId: id }),
disabledReason: supervisorSuspended ? undefined : `Supervisor must be suspended`,
},
{
icon: IconNames.CROSS,